In this phase, the primary challenge was translating raw, high-frequency sensor streams into a discrete matrix of meaningful features that machine learning algorithms (like Random Forest or K-Means) could process.

The Sampling Rate Epiphany & Physical Motivation Before writing the extraction code, I analyzed the data’s temporal structure. By inspecting the Milliseconds column, I determined that the sensor data was logged at 5ms intervals (a 200Hz sampling rate).

Final Code Analysis

1. Window Slicing and Label Aggregation

Python

for start in range(0, n - WINDOW_SIZE + 1, STEP_SIZE): window = df.iloc[start : start + WINDOW_SIZE] label = int(window['LABEL'].mode()[0])

I slide the 200-row window across each partition of the dataset. Since LABEL acts as ground-truth action identifier, I use mode()[0] (majority vote) to assign a single class label to the entire 1-second window.

2. Statistical Feature Extraction

Python

`feats[f'{col}_mean'] = np.mean(vals) feats[f'{col}_std'] = np.std(vals)

... Min and Max`

This is the core of the feature engineering process. Instead of feeding a model 200 separate points per second, I feed it a compact summary. A high standard deviation on R_Wrist_Rx, for example, tells the model that the operator's right wrist was twisting rapidly during that second.

3. Distributed Meta-Schema Definition

Python

meta_cols = {'LABEL': 'int8'} for col in sample_cols: meta_cols[f'{col}_mean'] = 'float32' # ... meta = pd.DataFrame({k: pd.Series(dtype=v) for k, v in meta_cols.items()})

One of the quirks of Dask's lazy evaluation is that it cannot reliably infer the output schema of a complex custom function (like window slider). By pre-defining the meta dataframe structure with memory-efficient float32 types, I prevent schema mismatch errors during the .compute() phase.

4. Dask map_partitions Execution

Python

features_df = pq_df.map_partitions(extract_window_features, meta=meta) features_df.to_parquet('data/main_data_features_parquet', ...)