import argparse import asyncio import requests # Import the status_pb2.py file generated by `protoc`. # This file must be in the same directory as logger.py. import status_pb2 import websockets from datetime import datetime class OdroidPowerLogger: """ A class to connect to the Odroid Smart Power monitoring server and log power data. 1. Logs into the server via an HTTP POST request to obtain an authentication token. 2. Connects to the WebSocket using the obtained token. 3. Receives and decodes binary data in Protobuf format, then prints it. """ def __init__(self, host, username, password): self.host = host self.username = username self.password = password self.base_url = f"http://{self.host}" self.ws_url = f"ws://{self.host}/ws" self.token = None def login(self): """Logs into the server to retrieve an authentication token.""" login_url = f"{self.base_url}/login" payload = {"username": self.username, "password": self.password} try: print(f"Attempting to log in to '{login_url}'...") response = requests.post(login_url, json=payload, timeout=5) response.raise_for_status() response_json = response.json() if "token" in response_json: self.token = response_json["token"] print("Login successful! Token received.") return True else: print("Login failed: No token in response.") return False except requests.exceptions.RequestException as e: print(f"Error during login: {e}") return False async def listen_power_data(self): """Connects to the WebSocket to receive and log power data.""" if not self.token: print("Cannot connect to WebSocket without an authentication token.") return # Add the authentication token as a query parameter uri = f"{self.ws_url}?token={self.token}" try: async with websockets.connect(uri) as websocket: print(f"Connected to WebSocket: {uri}") while True: # Receive binary message from the server message_bytes = await websocket.recv() # Decode the Protobuf message status_message = status_pb2.StatusMessage() status_message.ParseFromString(message_bytes) # Process only if the payload type is 'sensor_data' if status_message.WhichOneof('payload') == 'sensor_data': sensor_data = status_message.sensor_data ts = datetime.fromtimestamp(sensor_data.timestamp).strftime('%Y-%m-%d %H:%M:%S') print(f"--- {ts} (Uptime: {sensor_data.uptime_sec}s) ---") # Print data for each channel for name, channel in [('VIN', sensor_data.vin), ('MAIN', sensor_data.main), ('USB', sensor_data.usb)]: print( f" {name:<4}: {channel.voltage:5.2f} V | {channel.current:5.3f} A | {channel.power:5.2f} W") except websockets.exceptions.ConnectionClosed as e: print(f"WebSocket connection closed: {e}") except Exception as e: print(f"Error during WebSocket processing: {e}") async def run(self): """Runs the logger.""" if self.login(): await self.listen_power_data() async def main(): parser = argparse.ArgumentParser(description="Odroid Smart Power Data Logger") parser.add_argument("host", help="Server's host address or IP (e.g., 192.168.1.10)") parser.add_argument("-u", "--username", required=True, help="Login username") parser.add_argument("-p", "--password", required=True, help="Login password") args = parser.parse_args() logger = OdroidPowerLogger(host=args.host, username=args.username, password=args.password) await logger.run() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nExiting program.")