csv_2_plot: add support for data filtering with multiple filter types and window size customization

Signed-off-by: YoungSoo Shin <shinys000114@gmail.com>
This commit is contained in:
2025-12-11 18:04:40 +09:00
parent 09b1ca3089
commit cc8f1b77ed

View File

@@ -7,11 +7,14 @@ import pandas as pd
from dateutil.tz import gettz
from matplotlib.ticker import MultipleLocator, FuncFormatter
import math
from scipy.signal import savgol_filter
from scipy.ndimage import gaussian_filter1d
def plot_power_data(csv_path, output_path, plot_types, sources,
voltage_y_max=None, current_y_max=None, power_y_max=None,
relative_time=False, time_x_line=None, time_x_label=None):
relative_time=False, time_x_line=None, time_x_label=None,
filter_type=None, window_size=None):
"""
Reads power data from a CSV file and generates a plot image.
@@ -26,6 +29,8 @@ def plot_power_data(csv_path, output_path, plot_types, sources,
relative_time (bool): If True, the x-axis will show elapsed time from the start.
time_x_line (float, optional): Interval in seconds for x-axis grid lines.
time_x_label (float, optional): Interval in seconds for x-axis labels.
filter_type (str, optional): The type of filter to apply.
window_size (int, optional): The window size for the filter in seconds.
"""
try:
# Read the CSV file into a pandas DataFrame
@@ -57,10 +62,17 @@ def plot_power_data(csv_path, output_path, plot_types, sources,
return
# --- Calculate Average Interval ---
avg_interval_ms = 0
avg_interval_s = 0
if len(df) > 1:
avg_interval = df['timestamp'].diff().mean()
avg_interval_ms = avg_interval.total_seconds() * 1000
avg_interval_s = avg_interval.total_seconds()
avg_interval_ms = avg_interval_s * 1000
# --- Auto-set window size if not provided ---
if filter_type and window_size is None and avg_interval_s > 0:
window_size = avg_interval_s * 15 # Default to 15x the average interval
print(f"Window size not specified. Automatically set to {window_size:.2f} seconds.")
# --- Calculate Average Voltages ---
avg_voltages = {}
@@ -105,10 +117,46 @@ def plot_power_data(csv_path, output_path, plot_types, sources,
max_data_value = 0
for j, col_name in enumerate(config['cols']):
if col_name in df.columns:
ax.plot(x_axis_data, df[col_name], label=channel_labels[j], color=channel_colors[j], zorder=2)
# Plot original data
ax.plot(x_axis_data, df[col_name], label=f'{channel_labels[j]} (Raw)', color=channel_colors[j], alpha=0.5, zorder=2)
max_col_value = df[col_name].max()
if max_col_value > max_data_value:
max_data_value = max_col_value
# --- Apply and plot filtered data ---
if filter_type and window_size:
# Calculate window size in samples
if avg_interval_s > 0:
window_samples = int(window_size / avg_interval_s)
if window_samples < 1:
window_samples = 1
filtered_data = None
filter_label = ""
if filter_type == 'savgol':
# Window size for savgol must be odd
if window_samples % 2 == 0:
window_samples += 1
if window_samples > 2: # Polyorder must be less than window_length
filtered_data = savgol_filter(df[col_name], window_samples, 2) # polynomial order 2
filter_label = "Savitzky-Golay"
elif filter_type == 'moving_average':
filtered_data = df[col_name].rolling(window=window_samples, center=True).mean()
filter_label = "Moving Average"
elif filter_type == 'ema':
filtered_data = df[col_name].ewm(span=window_samples, adjust=False).mean()
filter_label = "Exponential Moving Average"
elif filter_type == 'gaussian':
# Sigma is a fraction of the window size
sigma = window_samples / 4.0
filtered_data = gaussian_filter1d(df[col_name], sigma=sigma)
filter_label = "Gaussian"
if filtered_data is not None:
ax.plot(x_axis_data, filtered_data, label=f'{channel_labels[j]} ({filter_label})', color=channel_colors[j], linestyle='-', linewidth=1.5, zorder=3)
else:
print(f"Warning: Column '{col_name}' not found in CSV. Skipping.")
@@ -256,6 +304,16 @@ def main():
)
parser.add_argument("--time_x_line", type=float, help="Interval in seconds for x-axis grid lines.")
parser.add_argument("--time_x_label", type=float, help="Interval in seconds for x-axis labels.")
parser.add_argument(
'--filter',
choices=['savgol', 'moving_average', 'ema', 'gaussian'],
help='The type of filter to apply to the data.'
)
parser.add_argument(
'--window_size',
type=float,
help='The window size for the filter in seconds.'
)
args = parser.parse_args()
@@ -270,7 +328,9 @@ def main():
power_y_max=args.power_y_max,
relative_time=args.relative_time,
time_x_line=args.time_x_line,
time_x_label=args.time_x_label
time_x_label=args.time_x_label,
filter_type=args.filter,
window_size=args.window_size
)