The Sampling Rate Epiphany & Physical Motivation
Before writing the extraction code, I analyzed the data’s temporal structure. By inspecting the Milliseconds column, I determined that the sensor data was logged at 5ms intervals (a 200Hz sampling rate).
WINDOW_SIZE as 200 rows (200×5ms=1s).Final Code Analysis
1. Window Slicing and Label Aggregation
Python
for start in range(0, n - WINDOW_SIZE + 1, STEP_SIZE): window = df.iloc[start : start + WINDOW_SIZE] label = int(window['LABEL'].mode()[0])
I slide the 200-row window across each partition of the dataset. Since LABEL acts as ground-truth action identifier, I use mode()[0] (majority vote) to assign a single class label to the entire 1-second window.
2. Statistical Feature Extraction
Python
`feats[f'{col}_mean'] = np.mean(vals) feats[f'{col}_std'] = np.std(vals)
This is the core of the feature engineering process. Instead of feeding a model 200 separate points per second, I feed it a compact summary. A high standard deviation on R_Wrist_Rx, for example, tells the model that the operator's right wrist was twisting rapidly during that second.
3. Distributed Meta-Schema Definition
Python
meta_cols = {'LABEL': 'int8'} for col in sample_cols: meta_cols[f'{col}_mean'] = 'float32' # ... meta = pd.DataFrame({k: pd.Series(dtype=v) for k, v in meta_cols.items()})
One of the quirks of Dask's lazy evaluation is that it cannot reliably infer the output schema of a complex custom function (like window slider). By pre-defining the meta dataframe structure with memory-efficient float32 types, I prevent schema mismatch errors during the .compute() phase.
4. Dask map_partitions Execution
Python
features_df = pq_df.map_partitions(extract_window_features, meta=meta) features_df.to_parquet('data/main_data_features_parquet', ...)