Files
odroid-power-mate/example/logger/logger.py

157 lines
6.5 KiB
Python

import argparse
import asyncio
import csv
import requests
import websockets
import websockets.asyncio
import websockets.asyncio.client
import websockets.exceptions
from datetime import datetime, timezone
# Import the status_pb2.py file generated by `protoc`.
# This file must be in the same directory as logger.py.
import status_pb2
class OdroidPowerLogger:
"""
A class to connect to the Odroid Smart Power monitoring server and log power data.
1. Logs into the server via an HTTP POST request to obtain an authentication token.
2. Connects to the WebSocket using the obtained token.
3. Receives and decodes binary data in Protobuf format, then prints it.
"""
def __init__(self, host, username, password, output_file=None):
self.host = host
self.username = username
self.password = password
self.base_url = f"http://{self.host}"
self.ws_url = f"ws://{self.host}/ws"
self.output_file = output_file
self.token = None
def login(self):
"""Logs into the server to retrieve an authentication token."""
login_url = f"{self.base_url}/login"
payload = {"username": self.username, "password": self.password}
try:
print(f"Attempting to log in to '{login_url}'...")
response = requests.post(login_url, json=payload, timeout=5)
response.raise_for_status()
response_json = response.json()
if "token" in response_json:
self.token = response_json["token"]
print("Login successful! Token received.")
return True
else:
print("Login failed: No token in response.")
return False
except requests.exceptions.RequestException as e:
print(f"Error during login: {e}")
return False
async def listen_power_data(self):
"""Connects to the WebSocket to receive and log power data."""
if not self.token:
print("Cannot connect to WebSocket without an authentication token.")
return
# Add the authentication token as a query parameter
uri = f"{self.ws_url}?token={self.token}"
csv_file = None
csv_writer = None
try:
# --- CSV File Handling ---
if self.output_file:
try:
# Open the file in write mode, with newline='' to prevent extra blank rows
csv_file = open(self.output_file, 'w', newline='', encoding='utf-8')
csv_writer = csv.writer(csv_file)
# Write header
header = [
'timestamp', 'uptime_ms',
'vin_voltage', 'vin_current', 'vin_power',
'main_voltage', 'main_current', 'main_power',
'usb_voltage', 'usb_current', 'usb_power'
]
csv_writer.writerow(header)
print(f"Logging data to {self.output_file}")
except IOError as e:
print(f"Error opening CSV file: {e}")
# If file can't be opened, disable CSV writing
csv_file = None
csv_writer = None
# --- End CSV File Handling ---
async with websockets.connect(uri) as websocket:
print(f"Connected to WebSocket: {uri}")
while True:
# Receive binary message from the server
message_bytes = await websocket.recv()
# Decode the Protobuf message
status_message = status_pb2.StatusMessage()
status_message.ParseFromString(message_bytes)
# Process only if the payload type is 'sensor_data'
if status_message.WhichOneof('payload') == 'sensor_data':
sensor_data = status_message.sensor_data
ts_dt = datetime.fromtimestamp(sensor_data.timestamp_ms / 1000, tz=timezone.utc)
ts_str_print = ts_dt.strftime('%Y-%m-%d %H:%M:%S UTC')
print(f"--- {ts_str_print} (Uptime: {sensor_data.uptime_ms / 1000}s) ---")
# Print data for each channel
for name, channel in [('VIN', sensor_data.vin), ('MAIN', sensor_data.main),
('USB', sensor_data.usb)]:
print(
f" {name:<4}: {channel.voltage:5.2f} V | {channel.current:5.3f} A | {channel.power:5.2f} W")
# Write to CSV if enabled
if csv_writer:
ts_iso_csv = ts_dt.isoformat(timespec='milliseconds').replace('+00:00', 'Z')
row = [
ts_iso_csv, sensor_data.uptime_ms,
f"{sensor_data.vin.voltage:.3f}", f"{sensor_data.vin.current:.3f}", f"{sensor_data.vin.power:.3f}",
f"{sensor_data.main.voltage:.3f}", f"{sensor_data.main.current:.3f}", f"{sensor_data.main.power:.3f}",
f"{sensor_data.usb.voltage:.3f}", f"{sensor_data.usb.current:.3f}", f"{sensor_data.usb.power:.3f}"
]
csv_writer.writerow(row)
except websockets.exceptions.ConnectionClosed as e:
print(f"WebSocket connection closed: {e}")
except Exception as e:
print(f"Error during WebSocket processing: {e}")
finally:
if csv_file:
csv_file.close()
print(f"\nCSV file '{self.output_file}' saved.")
async def run(self):
"""Runs the logger."""
if self.login():
await self.listen_power_data()
async def main():
parser = argparse.ArgumentParser(description="Odroid Smart Power Data Logger")
parser.add_argument("host", help="Server's host address or IP (e.g., 192.168.1.10)")
parser.add_argument("-u", "--username", required=True, help="Login username")
parser.add_argument("-p", "--password", required=True, help="Login password")
parser.add_argument("-o", "--output", help="Path to the output CSV file.")
args = parser.parse_args()
logger = OdroidPowerLogger(host=args.host, username=args.username, password=args.password, output_file=args.output)
await logger.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nExiting program.")