4 min read
Feature Engineering Best Practices for Production ML
Feature engineering is where data science meets data engineering. In 2021, feature stores emerged and best practices crystallized. Let’s explore how to do feature engineering right.
The Feature Store Pattern
Feature stores solve the training-serving skew problem:
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta
# Define entities
customer = Entity(
name="customer_id",
value_type=String,
description="Unique customer identifier"
)
# Define feature source
customer_features_source = FileSource(
path="abfss://features@datalake.dfs.core.windows.net/customer_features/",
file_format="parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp"
)
# Define feature view
customer_features = FeatureView(
name="customer_features",
entities=["customer_id"],
ttl=timedelta(days=90),
features=[
Feature(name="total_purchases_30d", dtype=Float32),
Feature(name="avg_order_value_30d", dtype=Float32),
Feature(name="purchase_frequency_30d", dtype=Float32),
Feature(name="days_since_last_purchase", dtype=Int64),
Feature(name="preferred_category", dtype=String),
Feature(name="lifetime_value", dtype=Float32),
],
online=True,
source=customer_features_source,
tags={"team": "customer-analytics", "version": "1.0"}
)
# Initialize store and apply
store = FeatureStore(repo_path=".")
store.apply([customer, customer_features])
Feature Engineering Pipeline
Building features at scale:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()
def build_customer_features(transactions_df, as_of_date):
"""Build customer features as of a specific date"""
# Filter to historical data only
historical = transactions_df.filter(F.col("transaction_date") < as_of_date)
# Define windows
window_30d = Window.partitionBy("customer_id").orderBy(F.col("transaction_date").desc())
# Calculate features
features = (historical
# Basic aggregations
.groupBy("customer_id")
.agg(
# Recency
F.datediff(F.lit(as_of_date), F.max("transaction_date")).alias("days_since_last_purchase"),
# Frequency
F.count("*").alias("total_transactions"),
F.countDistinct(F.date_trunc("month", "transaction_date")).alias("active_months"),
# Monetary
F.sum("amount").alias("total_spend"),
F.avg("amount").alias("avg_transaction_value"),
F.stddev("amount").alias("transaction_value_stddev"),
# 30-day rolling
F.sum(F.when(
F.col("transaction_date") >= F.date_sub(F.lit(as_of_date), 30),
F.col("amount")
)).alias("spend_30d"),
F.count(F.when(
F.col("transaction_date") >= F.date_sub(F.lit(as_of_date), 30),
True
)).alias("transactions_30d"),
# Category preferences
F.first(F.col("category")).alias("most_recent_category"),
)
# Derived features
.withColumn("purchase_frequency",
F.col("total_transactions") / F.greatest(F.col("active_months"), F.lit(1)))
.withColumn("avg_spend_30d",
F.col("spend_30d") / F.greatest(F.col("transactions_30d"), F.lit(1)))
.withColumn("spend_trend",
(F.col("avg_spend_30d") - F.col("avg_transaction_value")) /
F.greatest(F.col("avg_transaction_value"), F.lit(1)))
# Add metadata
.withColumn("feature_timestamp", F.lit(as_of_date))
.withColumn("feature_version", F.lit("1.0"))
)
return features
# Build features for training
training_features = build_customer_features(transactions, "2021-12-01")
Handling Categorical Features
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from category_encoders import TargetEncoder, WOEEncoder
import pandas as pd
import numpy as np
class CategoricalFeatureEncoder:
"""Production-ready categorical encoding"""
def __init__(self, encoding_strategy: dict):
"""
encoding_strategy: dict mapping column -> encoding type
e.g., {"category": "target", "region": "onehot", "brand": "label"}
"""
self.encoding_strategy = encoding_strategy
self.encoders = {}
def fit(self, df: pd.DataFrame, target: pd.Series = None):
for col, strategy in self.encoding_strategy.items():
if strategy == "onehot":
encoder = OneHotEncoder(sparse=False, handle_unknown="ignore")
encoder.fit(df[[col]])
elif strategy == "label":
encoder = LabelEncoder()
encoder.fit(df[col].fillna("__MISSING__"))
elif strategy == "target":
if target is None:
raise ValueError("Target required for target encoding")
encoder = TargetEncoder(cols=[col], smoothing=10)
encoder.fit(df[[col]], target)
elif strategy == "woe":
if target is None:
raise ValueError("Target required for WOE encoding")
encoder = WOEEncoder(cols=[col])
encoder.fit(df[[col]], target)
else:
raise ValueError(f"Unknown encoding strategy: {strategy}")
self.encoders[col] = encoder
return self
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
for col, encoder in self.encoders.items():
strategy = self.encoding_strategy[col]
if strategy == "onehot":
encoded = encoder.transform(df[[col]])
feature_names = encoder.get_feature_names_out([col])
for i, name in enumerate(feature_names):
result[name] = encoded[:, i]
result = result.drop(columns=[col])
elif strategy == "label":
result[f"{col}_encoded"] = encoder.transform(
df[col].fillna("__MISSING__")
)
else:
result[f"{col}_encoded"] = encoder.transform(df[[col]])
return result
Time-Based Feature Engineering
def create_time_features(df, timestamp_col: str):
"""Extract temporal features from timestamp"""
return (df
# Basic time components
.withColumn("hour", F.hour(timestamp_col))
.withColumn("day_of_week", F.dayofweek(timestamp_col))
.withColumn("day_of_month", F.dayofmonth(timestamp_col))
.withColumn("month", F.month(timestamp_col))
.withColumn("quarter", F.quarter(timestamp_col))
.withColumn("year", F.year(timestamp_col))
# Cyclic encoding for periodic features
.withColumn("hour_sin", F.sin(2 * 3.14159 * F.col("hour") / 24))
.withColumn("hour_cos", F.cos(2 * 3.14159 * F.col("hour") / 24))
.withColumn("dow_sin", F.sin(2 * 3.14159 * F.col("day_of_week") / 7))
.withColumn("dow_cos", F.cos(2 * 3.14159 * F.col("day_of_week") / 7))
.withColumn("month_sin", F.sin(2 * 3.14159 * F.col("month") / 12))
.withColumn("month_cos", F.cos(2 * 3.14159 * F.col("month") / 12))
# Business logic features
.withColumn("is_weekend", F.col("day_of_week").isin([1, 7]).cast("int"))
.withColumn("is_business_hours",
(F.col("hour").between(9, 17) & ~F.col("is_weekend").cast("boolean")).cast("int"))
)
def create_lag_features(df, partition_cols, order_col, value_cols, lags):
"""Create lag features for time series"""
for col in value_cols:
for lag in lags:
window = Window.partitionBy(*partition_cols).orderBy(order_col)
df = df.withColumn(f"{col}_lag_{lag}", F.lag(col, lag).over(window))
return df
def create_rolling_features(df, partition_cols, order_col, value_cols, windows):
"""Create rolling window features"""
for col in value_cols:
for window_size in windows:
window = (Window
.partitionBy(*partition_cols)
.orderBy(order_col)
.rowsBetween(-window_size, -1))
df = (df
.withColumn(f"{col}_rolling_mean_{window_size}", F.avg(col).over(window))
.withColumn(f"{col}_rolling_std_{window_size}", F.stddev(col).over(window))
.withColumn(f"{col}_rolling_min_{window_size}", F.min(col).over(window))
.withColumn(f"{col}_rolling_max_{window_size}", F.max(col).over(window))
)
return df
Feature Validation
from great_expectations.dataset import PandasDataset
def validate_features(features_df):
"""Validate feature quality before training"""
ge_df = PandasDataset(features_df)
validations = [
# No null values in key features
ge_df.expect_column_values_to_not_be_null("customer_id"),
# Numeric ranges
ge_df.expect_column_values_to_be_between(
"purchase_frequency", min_value=0, max_value=100
),
ge_df.expect_column_values_to_be_between(
"days_since_last_purchase", min_value=0
),
# Distribution checks
ge_df.expect_column_mean_to_be_between(
"avg_transaction_value", min_value=10, max_value=1000
),
# Completeness
ge_df.expect_column_values_to_not_be_null(
"total_transactions", mostly=0.99
),
]
failed = [v for v in validations if not v.success]
if failed:
raise FeatureValidationError(f"Feature validation failed: {failed}")
Key Principles for 2021
- Feature Stores are Essential: Solve training-serving skew
- Point-in-Time Correctness: Prevent data leakage
- Feature Versioning: Track feature definitions
- Monitoring: Watch for feature drift in production
Feature engineering in 2021 became more systematic and production-oriented. The ad-hoc notebook approach is giving way to proper engineering practices.