import argparse import asyncio import csv import requests import websockets import websockets.asyncio import websockets.asyncio.client import websockets.exceptions from datetime import datetime, timezone # Import the status_pb2.py file generated by `protoc`. # This file must be in the same directory as logger.py. import status_pb2 class OdroidPowerLogger: """ A class to connect to the Odroid Smart Power monitoring server and log power data. 1. Logs into the server via an HTTP POST request to obtain an authentication token. 2. Connects to the WebSocket using the obtained token. 3. Receives and decodes binary data in Protobuf format, then prints it. """ def __init__(self, host, username, password, output_file=None): self.host = host self.username = username self.password = password self.base_url = f"http://{self.host}" self.ws_url = f"ws://{self.host}/ws" self.output_file = output_file self.token = None def login(self): """Logs into the server to retrieve an authentication token.""" login_url = f"{self.base_url}/login" payload = {"username": self.username, "password": self.password} try: print(f"Attempting to log in to '{login_url}'...") response = requests.post(login_url, json=payload, timeout=5) response.raise_for_status() response_json = response.json() if "token" in response_json: self.token = response_json["token"] print("Login successful! Token received.") return True else: print("Login failed: No token in response.") return False except requests.exceptions.RequestException as e: print(f"Error during login: {e}") return False async def listen_power_data(self): """Connects to the WebSocket to receive and log power data.""" if not self.token: print("Cannot connect to WebSocket without an authentication token.") return # Add the authentication token as a query parameter uri = f"{self.ws_url}?token={self.token}" csv_file = None csv_writer = None try: # --- CSV File Handling --- if self.output_file: try: # Open the file in write mode, with newline='' to prevent extra blank rows csv_file = open(self.output_file, 'w', newline='', encoding='utf-8') csv_writer = csv.writer(csv_file) # Write header header = [ 'timestamp', 'uptime_ms', 'vin_voltage', 'vin_current', 'vin_power', 'main_voltage', 'main_current', 'main_power', 'usb_voltage', 'usb_current', 'usb_power' ] csv_writer.writerow(header) print(f"Logging data to {self.output_file}") except IOError as e: print(f"Error opening CSV file: {e}") # If file can't be opened, disable CSV writing csv_file = None csv_writer = None # --- End CSV File Handling --- async with websockets.connect(uri) as websocket: print(f"Connected to WebSocket: {uri}") while True: # Receive binary message from the server message_bytes = await websocket.recv() # Decode the Protobuf message status_message = status_pb2.StatusMessage() status_message.ParseFromString(message_bytes) # Process only if the payload type is 'sensor_data' if status_message.WhichOneof('payload') == 'sensor_data': sensor_data = status_message.sensor_data ts_dt = datetime.fromtimestamp(sensor_data.timestamp_ms / 1000, tz=timezone.utc) ts_str_print = ts_dt.strftime('%Y-%m-%d %H:%M:%S UTC') print(f"--- {ts_str_print} (Uptime: {sensor_data.uptime_ms / 1000}s) ---") # Print data for each channel for name, channel in [('VIN', sensor_data.vin), ('MAIN', sensor_data.main), ('USB', sensor_data.usb)]: print( f" {name:<4}: {channel.voltage:5.2f} V | {channel.current:5.3f} A | {channel.power:5.2f} W") # Write to CSV if enabled if csv_writer: ts_iso_csv = ts_dt.isoformat(timespec='milliseconds').replace('+00:00', 'Z') row = [ ts_iso_csv, sensor_data.uptime_ms, f"{sensor_data.vin.voltage:.3f}", f"{sensor_data.vin.current:.3f}", f"{sensor_data.vin.power:.3f}", f"{sensor_data.main.voltage:.3f}", f"{sensor_data.main.current:.3f}", f"{sensor_data.main.power:.3f}", f"{sensor_data.usb.voltage:.3f}", f"{sensor_data.usb.current:.3f}", f"{sensor_data.usb.power:.3f}" ] csv_writer.writerow(row) except websockets.exceptions.ConnectionClosed as e: print(f"WebSocket connection closed: {e}") except Exception as e: print(f"Error during WebSocket processing: {e}") finally: if csv_file: csv_file.close() print(f"\nCSV file '{self.output_file}' saved.") async def run(self): """Runs the logger.""" if self.login(): await self.listen_power_data() async def main(): parser = argparse.ArgumentParser(description="Odroid Smart Power Data Logger") parser.add_argument("host", help="Server's host address or IP (e.g., 192.168.1.10)") parser.add_argument("-u", "--username", required=True, help="Login username") parser.add_argument("-p", "--password", required=True, help="Login password") parser.add_argument("-o", "--output", help="Path to the output CSV file.") args = parser.parse_args() logger = OdroidPowerLogger(host=args.host, username=args.username, password=args.password, output_file=args.output) await logger.run() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nExiting program.")