From e3d98ed3dd5727c5c23efc7a9925a3ba376a4536 Mon Sep 17 00:00:00 2001 From: YoungSoo Shin Date: Wed, 19 Nov 2025 14:18:41 +0900 Subject: [PATCH] example: add example script Signed-off-by: YoungSoo Shin --- example/logger/README.md | 32 ++++++++++++ example/logger/logger.py | 107 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 example/logger/README.md create mode 100644 example/logger/logger.py diff --git a/example/logger/README.md b/example/logger/README.md new file mode 100644 index 0000000..576a9a6 --- /dev/null +++ b/example/logger/README.md @@ -0,0 +1,32 @@ +# Power Consumption Logger Example + +Based on this script, you can monitor power consumption and implement graph plotting. + +## How to Run the Script + +### Install Python Virtual Environment + +```shell +sudo apt install virtualenv +virtualenv venv +source venv/bin/activate +``` + +### Install require package + +```shell +pip install grpcio-tools requests websockets protobuf +``` + +### Build `status_pb2.py` + +```shell +python -m grpc_tools.protoc -I ../../proto --python_out=. status.proto +``` + +### Execute script + +```shell +# python3 logger.py -u -p
+python3 logger.py -u admin -p password 192.168.30.5 +``` \ No newline at end of file diff --git a/example/logger/logger.py b/example/logger/logger.py new file mode 100644 index 0000000..a2551ed --- /dev/null +++ b/example/logger/logger.py @@ -0,0 +1,107 @@ +import argparse +import asyncio +import requests +# Import the status_pb2.py file generated by `protoc`. +# This file must be in the same directory as logger.py. +import status_pb2 +import websockets +from datetime import datetime + + +class OdroidPowerLogger: + """ + A class to connect to the Odroid Smart Power monitoring server and log power data. + 1. Logs into the server via an HTTP POST request to obtain an authentication token. + 2. Connects to the WebSocket using the obtained token. + 3. Receives and decodes binary data in Protobuf format, then prints it. + """ + + def __init__(self, host, username, password): + self.host = host + self.username = username + self.password = password + self.base_url = f"http://{self.host}" + self.ws_url = f"ws://{self.host}/ws" + self.token = None + + def login(self): + """Logs into the server to retrieve an authentication token.""" + login_url = f"{self.base_url}/login" + payload = {"username": self.username, "password": self.password} + try: + print(f"Attempting to log in to '{login_url}'...") + response = requests.post(login_url, json=payload, timeout=5) + response.raise_for_status() + + response_json = response.json() + if "token" in response_json: + self.token = response_json["token"] + print("Login successful! Token received.") + return True + else: + print("Login failed: No token in response.") + return False + except requests.exceptions.RequestException as e: + print(f"Error during login: {e}") + return False + + async def listen_power_data(self): + """Connects to the WebSocket to receive and log power data.""" + if not self.token: + print("Cannot connect to WebSocket without an authentication token.") + return + + # Add the authentication token as a query parameter + uri = f"{self.ws_url}?token={self.token}" + + try: + async with websockets.connect(uri) as websocket: + print(f"Connected to WebSocket: {uri}") + while True: + # Receive binary message from the server + message_bytes = await websocket.recv() + + # Decode the Protobuf message + status_message = status_pb2.StatusMessage() + status_message.ParseFromString(message_bytes) + + # Process only if the payload type is 'sensor_data' + if status_message.WhichOneof('payload') == 'sensor_data': + sensor_data = status_message.sensor_data + ts = datetime.fromtimestamp(sensor_data.timestamp).strftime('%Y-%m-%d %H:%M:%S') + + print(f"--- {ts} (Uptime: {sensor_data.uptime_sec}s) ---") + + # Print data for each channel + for name, channel in [('VIN', sensor_data.vin), ('MAIN', sensor_data.main), + ('USB', sensor_data.usb)]: + print( + f" {name:<4}: {channel.voltage:5.2f} V | {channel.current:5.3f} A | {channel.power:5.2f} W") + + except websockets.exceptions.ConnectionClosed as e: + print(f"WebSocket connection closed: {e}") + except Exception as e: + print(f"Error during WebSocket processing: {e}") + + async def run(self): + """Runs the logger.""" + if self.login(): + await self.listen_power_data() + + +async def main(): + parser = argparse.ArgumentParser(description="Odroid Smart Power Data Logger") + parser.add_argument("host", help="Server's host address or IP (e.g., 192.168.1.10)") + parser.add_argument("-u", "--username", required=True, help="Login username") + parser.add_argument("-p", "--password", required=True, help="Login password") + args = parser.parse_args() + + logger = OdroidPowerLogger(host=args.host, username=args.username, password=args.password) + await logger.run() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nExiting program.")