add example script

Signed-off-by: YoungSoo Shin <shinys000114@gmail.com>
This commit is contained in:
2025-11-19 14:18:41 +09:00
parent 535bd9388e
commit 6493cde019
2 changed files with 139 additions and 0 deletions

32
example/logger/README.md Normal file
View File

@@ -0,0 +1,32 @@
# Power Consumption Logger Example
Based on this script, you can monitor power consumption and implement graph plotting.
## How to Run the Script
### Install Python Virtual Environment
```shell
sudo apt install virtualenv
virtualenv venv
. venv/bin/activate
```
### Install require package
```shell
pip install grpcio-tools requests websockets protobuf
```
### `status_pb2.py` 빌드
```shell
python -m grpc_tools.protoc -I ../../proto --python_out=. status.proto
```
### Execute script
```shell
# python3 logger.py -u <username> -p <password> <address>
python3 logger.py -u admin -p password 192.168.30.5
```

107
example/logger/logger.py Normal file
View File

@@ -0,0 +1,107 @@
import argparse
import asyncio
import requests
# Import the status_pb2.py file generated by `protoc`.
# This file must be in the same directory as logger.py.
import status_pb2
import websockets
from datetime import datetime
class OdroidPowerLogger:
"""
A class to connect to the Odroid Smart Power monitoring server and log power data.
1. Logs into the server via an HTTP POST request to obtain an authentication token.
2. Connects to the WebSocket using the obtained token.
3. Receives and decodes binary data in Protobuf format, then prints it.
"""
def __init__(self, host, username, password):
self.host = host
self.username = username
self.password = password
self.base_url = f"http://{self.host}"
self.ws_url = f"ws://{self.host}/ws"
self.token = None
def login(self):
"""Logs into the server to retrieve an authentication token."""
login_url = f"{self.base_url}/login"
payload = {"username": self.username, "password": self.password}
try:
print(f"Attempting to log in to '{login_url}'...")
response = requests.post(login_url, json=payload, timeout=5)
response.raise_for_status()
response_json = response.json()
if "token" in response_json:
self.token = response_json["token"]
print("Login successful! Token received.")
return True
else:
print("Login failed: No token in response.")
return False
except requests.exceptions.RequestException as e:
print(f"Error during login: {e}")
return False
async def listen_power_data(self):
"""Connects to the WebSocket to receive and log power data."""
if not self.token:
print("Cannot connect to WebSocket without an authentication token.")
return
# Add the authentication token as a query parameter
uri = f"{self.ws_url}?token={self.token}"
try:
async with websockets.connect(uri) as websocket:
print(f"Connected to WebSocket: {uri}")
while True:
# Receive binary message from the server
message_bytes = await websocket.recv()
# Decode the Protobuf message
status_message = status_pb2.StatusMessage()
status_message.ParseFromString(message_bytes)
# Process only if the payload type is 'sensor_data'
if status_message.WhichOneof('payload') == 'sensor_data':
sensor_data = status_message.sensor_data
ts = datetime.fromtimestamp(sensor_data.timestamp).strftime('%Y-%m-%d %H:%M:%S')
print(f"--- {ts} (Uptime: {sensor_data.uptime_sec}s) ---")
# Print data for each channel
for name, channel in [('VIN', sensor_data.vin), ('MAIN', sensor_data.main),
('USB', sensor_data.usb)]:
print(
f" {name:<4}: {channel.voltage:5.2f} V | {channel.current:5.3f} A | {channel.power:5.2f} W")
except websockets.exceptions.ConnectionClosed as e:
print(f"WebSocket connection closed: {e}")
except Exception as e:
print(f"Error during WebSocket processing: {e}")
async def run(self):
"""Runs the logger."""
if self.login():
await self.listen_power_data()
async def main():
parser = argparse.ArgumentParser(description="Odroid Smart Power Data Logger")
parser.add_argument("host", help="Server's host address or IP (e.g., 192.168.1.10)")
parser.add_argument("-u", "--username", required=True, help="Login username")
parser.add_argument("-p", "--password", required=True, help="Login password")
args = parser.parse_args()
logger = OdroidPowerLogger(host=args.host, username=args.username, password=args.password)
await logger.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nExiting program.")