From cc8f1b77ed8512f354d42b6f356c36b2d316ad02 Mon Sep 17 00:00:00 2001 From: YoungSoo Shin Date: Thu, 11 Dec 2025 18:04:40 +0900 Subject: [PATCH] csv_2_plot: add support for data filtering with multiple filter types and window size customization Signed-off-by: YoungSoo Shin --- example/logger/csv_2_plot.py | 70 +++++++++++++++++++++++++++++++++--- 1 file changed, 65 insertions(+), 5 deletions(-) diff --git a/example/logger/csv_2_plot.py b/example/logger/csv_2_plot.py index 16e3b78..cb22db3 100644 --- a/example/logger/csv_2_plot.py +++ b/example/logger/csv_2_plot.py @@ -7,11 +7,14 @@ import pandas as pd from dateutil.tz import gettz from matplotlib.ticker import MultipleLocator, FuncFormatter import math +from scipy.signal import savgol_filter +from scipy.ndimage import gaussian_filter1d def plot_power_data(csv_path, output_path, plot_types, sources, voltage_y_max=None, current_y_max=None, power_y_max=None, - relative_time=False, time_x_line=None, time_x_label=None): + relative_time=False, time_x_line=None, time_x_label=None, + filter_type=None, window_size=None): """ Reads power data from a CSV file and generates a plot image. @@ -26,6 +29,8 @@ def plot_power_data(csv_path, output_path, plot_types, sources, relative_time (bool): If True, the x-axis will show elapsed time from the start. time_x_line (float, optional): Interval in seconds for x-axis grid lines. time_x_label (float, optional): Interval in seconds for x-axis labels. + filter_type (str, optional): The type of filter to apply. + window_size (int, optional): The window size for the filter in seconds. """ try: # Read the CSV file into a pandas DataFrame @@ -57,10 +62,17 @@ def plot_power_data(csv_path, output_path, plot_types, sources, return # --- Calculate Average Interval --- - avg_interval_ms = 0 + avg_interval_s = 0 if len(df) > 1: avg_interval = df['timestamp'].diff().mean() - avg_interval_ms = avg_interval.total_seconds() * 1000 + avg_interval_s = avg_interval.total_seconds() + avg_interval_ms = avg_interval_s * 1000 + + # --- Auto-set window size if not provided --- + if filter_type and window_size is None and avg_interval_s > 0: + window_size = avg_interval_s * 15 # Default to 15x the average interval + print(f"Window size not specified. Automatically set to {window_size:.2f} seconds.") + # --- Calculate Average Voltages --- avg_voltages = {} @@ -105,10 +117,46 @@ def plot_power_data(csv_path, output_path, plot_types, sources, max_data_value = 0 for j, col_name in enumerate(config['cols']): if col_name in df.columns: - ax.plot(x_axis_data, df[col_name], label=channel_labels[j], color=channel_colors[j], zorder=2) + # Plot original data + ax.plot(x_axis_data, df[col_name], label=f'{channel_labels[j]} (Raw)', color=channel_colors[j], alpha=0.5, zorder=2) max_col_value = df[col_name].max() if max_col_value > max_data_value: max_data_value = max_col_value + + # --- Apply and plot filtered data --- + if filter_type and window_size: + # Calculate window size in samples + if avg_interval_s > 0: + window_samples = int(window_size / avg_interval_s) + if window_samples < 1: + window_samples = 1 + + filtered_data = None + filter_label = "" + + if filter_type == 'savgol': + # Window size for savgol must be odd + if window_samples % 2 == 0: + window_samples += 1 + if window_samples > 2: # Polyorder must be less than window_length + filtered_data = savgol_filter(df[col_name], window_samples, 2) # polynomial order 2 + filter_label = "Savitzky-Golay" + elif filter_type == 'moving_average': + filtered_data = df[col_name].rolling(window=window_samples, center=True).mean() + filter_label = "Moving Average" + elif filter_type == 'ema': + filtered_data = df[col_name].ewm(span=window_samples, adjust=False).mean() + filter_label = "Exponential Moving Average" + elif filter_type == 'gaussian': + # Sigma is a fraction of the window size + sigma = window_samples / 4.0 + filtered_data = gaussian_filter1d(df[col_name], sigma=sigma) + filter_label = "Gaussian" + + + if filtered_data is not None: + ax.plot(x_axis_data, filtered_data, label=f'{channel_labels[j]} ({filter_label})', color=channel_colors[j], linestyle='-', linewidth=1.5, zorder=3) + else: print(f"Warning: Column '{col_name}' not found in CSV. Skipping.") @@ -256,6 +304,16 @@ def main(): ) parser.add_argument("--time_x_line", type=float, help="Interval in seconds for x-axis grid lines.") parser.add_argument("--time_x_label", type=float, help="Interval in seconds for x-axis labels.") + parser.add_argument( + '--filter', + choices=['savgol', 'moving_average', 'ema', 'gaussian'], + help='The type of filter to apply to the data.' + ) + parser.add_argument( + '--window_size', + type=float, + help='The window size for the filter in seconds.' + ) args = parser.parse_args() @@ -270,7 +328,9 @@ def main(): power_y_max=args.power_y_max, relative_time=args.relative_time, time_x_line=args.time_x_line, - time_x_label=args.time_x_label + time_x_label=args.time_x_label, + filter_type=args.filter, + window_size=args.window_size )