example: add support for csv

Signed-off-by: YoungSoo Shin <shinys000114@gmail.com>
This commit is contained in:
2025-11-19 16:15:53 +09:00
parent e3d98ed3dd
commit 4e6db88f7e

View File

@@ -1,11 +1,13 @@
import argparse
import asyncio
import csv
import requests
import websockets
from datetime import datetime
# Import the status_pb2.py file generated by `protoc`.
# This file must be in the same directory as logger.py.
import status_pb2
import websockets
from datetime import datetime
class OdroidPowerLogger:
@@ -16,12 +18,13 @@ class OdroidPowerLogger:
3. Receives and decodes binary data in Protobuf format, then prints it.
"""
def __init__(self, host, username, password):
def __init__(self, host, username, password, output_file=None):
self.host = host
self.username = username
self.password = password
self.base_url = f"http://{self.host}"
self.ws_url = f"ws://{self.host}/ws"
self.output_file = output_file
self.token = None
def login(self):
@@ -54,7 +57,33 @@ class OdroidPowerLogger:
# Add the authentication token as a query parameter
uri = f"{self.ws_url}?token={self.token}"
csv_file = None
csv_writer = None
try:
# --- CSV File Handling ---
if self.output_file:
try:
# Open the file in write mode, with newline='' to prevent extra blank rows
csv_file = open(self.output_file, 'w', newline='', encoding='utf-8')
csv_writer = csv.writer(csv_file)
# Write header
header = [
'timestamp', 'uptime_sec',
'vin_voltage', 'vin_current', 'vin_power',
'main_voltage', 'main_current', 'main_power',
'usb_voltage', 'usb_current', 'usb_power'
]
csv_writer.writerow(header)
print(f"Logging data to {self.output_file}")
except IOError as e:
print(f"Error opening CSV file: {e}")
# If file can't be opened, disable CSV writing
csv_file = None
csv_writer = None
# --- End CSV File Handling ---
async with websockets.connect(uri) as websocket:
print(f"Connected to WebSocket: {uri}")
while True:
@@ -68,9 +97,10 @@ class OdroidPowerLogger:
# Process only if the payload type is 'sensor_data'
if status_message.WhichOneof('payload') == 'sensor_data':
sensor_data = status_message.sensor_data
ts = datetime.fromtimestamp(sensor_data.timestamp).strftime('%Y-%m-%d %H:%M:%S')
ts_dt = datetime.fromtimestamp(sensor_data.timestamp)
ts_str = ts_dt.strftime('%Y-%m-%d %H:%M:%S')
print(f"--- {ts} (Uptime: {sensor_data.uptime_sec}s) ---")
print(f"--- {ts_str} (Uptime: {sensor_data.uptime_sec}s) ---")
# Print data for each channel
for name, channel in [('VIN', sensor_data.vin), ('MAIN', sensor_data.main),
@@ -78,10 +108,24 @@ class OdroidPowerLogger:
print(
f" {name:<4}: {channel.voltage:5.2f} V | {channel.current:5.3f} A | {channel.power:5.2f} W")
# Write to CSV if enabled
if csv_writer:
row = [
ts_dt.isoformat(), sensor_data.uptime_sec,
sensor_data.vin.voltage, sensor_data.vin.current, sensor_data.vin.power,
sensor_data.main.voltage, sensor_data.main.current, sensor_data.main.power,
sensor_data.usb.voltage, sensor_data.usb.current, sensor_data.usb.power
]
csv_writer.writerow(row)
except websockets.exceptions.ConnectionClosed as e:
print(f"WebSocket connection closed: {e}")
except Exception as e:
print(f"Error during WebSocket processing: {e}")
finally:
if csv_file:
csv_file.close()
print(f"\nCSV file '{self.output_file}' saved.")
async def run(self):
"""Runs the logger."""
@@ -94,9 +138,10 @@ async def main():
parser.add_argument("host", help="Server's host address or IP (e.g., 192.168.1.10)")
parser.add_argument("-u", "--username", required=True, help="Login username")
parser.add_argument("-p", "--password", required=True, help="Login password")
parser.add_argument("-o", "--output", help="Path to the output CSV file.")
args = parser.parse_args()
logger = OdroidPowerLogger(host=args.host, username=args.username, password=args.password)
logger = OdroidPowerLogger(host=args.host, username=args.username, password=args.password, output_file=args.output)
await logger.run()