7 min read
Synthetic Data Generation: When Real Data Isn't Available
Synthetic data - artificially generated data that mimics real data - emerged as a solution for privacy, testing, and data scarcity challenges in 2021. Let’s explore how to generate useful synthetic data.
Why Synthetic Data?
- Privacy: No real personal information
- Testing: Generate edge cases and scenarios
- Data Augmentation: Expand limited datasets
- Cost: Cheaper than collecting real data
- Compliance: Avoid regulatory concerns
Statistical Synthetic Data Generation
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class ColumnProfile:
name: str
dtype: str
distribution: str
parameters: dict
null_rate: float = 0.0
constraints: Optional[dict] = None
class StatisticalSynthesizer:
"""Generate synthetic data based on statistical properties"""
def __init__(self):
self.profiles: Dict[str, ColumnProfile] = {}
self.correlations: Optional[np.ndarray] = None
self.column_order: List[str] = []
def fit(self, df: pd.DataFrame):
"""Learn statistical properties from real data"""
self.column_order = list(df.columns)
# Profile each column
for col in df.columns:
self.profiles[col] = self._profile_column(df[col])
# Learn correlations for numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 1:
self.correlations = df[numeric_cols].corr().values
return self
def _profile_column(self, series: pd.Series) -> ColumnProfile:
"""Profile a single column"""
null_rate = series.isnull().mean()
series_clean = series.dropna()
if series.dtype in ['int64', 'float64']:
# Fit best distribution
distribution, params = self._fit_numeric_distribution(series_clean)
return ColumnProfile(
name=series.name,
dtype=str(series.dtype),
distribution=distribution,
parameters=params,
null_rate=null_rate,
constraints={
'min': series_clean.min(),
'max': series_clean.max()
}
)
elif series.dtype == 'object' or series.dtype.name == 'category':
# Categorical: use value frequencies
value_counts = series_clean.value_counts(normalize=True)
return ColumnProfile(
name=series.name,
dtype='category',
distribution='categorical',
parameters={
'categories': value_counts.index.tolist(),
'probabilities': value_counts.values.tolist()
},
null_rate=null_rate
)
elif np.issubdtype(series.dtype, np.datetime64):
return ColumnProfile(
name=series.name,
dtype='datetime',
distribution='uniform',
parameters={
'start': series_clean.min(),
'end': series_clean.max()
},
null_rate=null_rate
)
else:
raise ValueError(f"Unsupported dtype: {series.dtype}")
def _fit_numeric_distribution(self, series: pd.Series) -> tuple:
"""Fit the best distribution for numeric data"""
distributions = ['norm', 'lognorm', 'expon', 'uniform']
best_dist = None
best_sse = np.inf
for dist_name in distributions:
try:
dist = getattr(stats, dist_name)
params = dist.fit(series)
# Calculate goodness of fit
ks_stat, p_value = stats.kstest(series, dist_name, params)
if ks_stat < best_sse:
best_sse = ks_stat
best_dist = (dist_name, params)
except Exception:
continue
return best_dist or ('norm', (series.mean(), series.std()))
def generate(self, n_rows: int) -> pd.DataFrame:
"""Generate synthetic data"""
data = {}
for col_name in self.column_order:
profile = self.profiles[col_name]
values = self._generate_column(profile, n_rows)
data[col_name] = values
return pd.DataFrame(data)
def _generate_column(self, profile: ColumnProfile, n_rows: int) -> np.ndarray:
"""Generate values for a single column"""
if profile.distribution == 'categorical':
values = np.random.choice(
profile.parameters['categories'],
size=n_rows,
p=profile.parameters['probabilities']
)
elif profile.distribution == 'norm':
loc, scale = profile.parameters
values = np.random.normal(loc, scale, n_rows)
elif profile.distribution == 'lognorm':
s, loc, scale = profile.parameters
values = stats.lognorm.rvs(s, loc, scale, size=n_rows)
elif profile.distribution == 'uniform':
if profile.dtype == 'datetime':
start = profile.parameters['start'].value
end = profile.parameters['end'].value
values = pd.to_datetime(np.random.randint(start, end, n_rows))
else:
low, high = profile.parameters
values = np.random.uniform(low, high, n_rows)
else:
raise ValueError(f"Unknown distribution: {profile.distribution}")
# Apply constraints
if profile.constraints and profile.dtype in ['int64', 'float64']:
values = np.clip(values, profile.constraints['min'], profile.constraints['max'])
if 'int' in profile.dtype:
values = values.astype(int)
# Add nulls
if profile.null_rate > 0:
null_mask = np.random.random(n_rows) < profile.null_rate
values = np.where(null_mask, None, values)
return values
# Usage
real_data = pd.read_csv('customers.csv')
synthesizer = StatisticalSynthesizer()
synthesizer.fit(real_data)
synthetic_data = synthesizer.generate(10000)
GAN-Based Synthetic Data
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
class TabularGAN:
"""GAN for generating tabular synthetic data"""
def __init__(self, input_dim: int, latent_dim: int = 128):
self.input_dim = input_dim
self.latent_dim = latent_dim
self.generator = self._build_generator()
self.discriminator = self._build_discriminator()
def _build_generator(self) -> nn.Module:
return nn.Sequential(
nn.Linear(self.latent_dim, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Linear(256, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Linear(256, self.input_dim),
nn.Tanh()
)
def _build_discriminator(self) -> nn.Module:
return nn.Sequential(
nn.Linear(self.input_dim, 256),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(256, 512),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(512, 256),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(256, 1),
nn.Sigmoid()
)
def train(
self,
real_data: np.ndarray,
epochs: int = 1000,
batch_size: int = 64,
lr: float = 0.0002
):
"""Train the GAN"""
dataset = TensorDataset(torch.FloatTensor(real_data))
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
criterion = nn.BCELoss()
g_optimizer = torch.optim.Adam(self.generator.parameters(), lr=lr, betas=(0.5, 0.999))
d_optimizer = torch.optim.Adam(self.discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
for epoch in range(epochs):
for real_batch, in dataloader:
batch_size = real_batch.size(0)
# Train discriminator
d_optimizer.zero_grad()
real_labels = torch.ones(batch_size, 1)
fake_labels = torch.zeros(batch_size, 1)
# Real data
d_real = self.discriminator(real_batch)
d_loss_real = criterion(d_real, real_labels)
# Fake data
noise = torch.randn(batch_size, self.latent_dim)
fake_data = self.generator(noise)
d_fake = self.discriminator(fake_data.detach())
d_loss_fake = criterion(d_fake, fake_labels)
d_loss = d_loss_real + d_loss_fake
d_loss.backward()
d_optimizer.step()
# Train generator
g_optimizer.zero_grad()
d_fake = self.discriminator(fake_data)
g_loss = criterion(d_fake, real_labels)
g_loss.backward()
g_optimizer.step()
if epoch % 100 == 0:
print(f"Epoch {epoch}: D_loss={d_loss.item():.4f}, G_loss={g_loss.item():.4f}")
def generate(self, n_samples: int) -> np.ndarray:
"""Generate synthetic samples"""
self.generator.eval()
with torch.no_grad():
noise = torch.randn(n_samples, self.latent_dim)
synthetic = self.generator(noise)
return synthetic.numpy()
Synthetic Data with SDV
from sdv.tabular import GaussianCopula, CTGAN
from sdv.evaluation import evaluate
class SDVSynthesizer:
"""Synthetic Data Vault wrapper for high-quality synthetic data"""
def __init__(self, method: str = "gaussian_copula"):
if method == "gaussian_copula":
self.model = GaussianCopula()
elif method == "ctgan":
self.model = CTGAN()
else:
raise ValueError(f"Unknown method: {method}")
def fit(self, df: pd.DataFrame, metadata: dict = None):
"""Fit the model to real data"""
self.model.fit(df)
return self
def generate(self, n_rows: int) -> pd.DataFrame:
"""Generate synthetic data"""
return self.model.sample(n_rows)
def evaluate_quality(self, real_df: pd.DataFrame, synthetic_df: pd.DataFrame) -> dict:
"""Evaluate synthetic data quality"""
scores = evaluate(synthetic_df, real_df)
return {
'overall_score': scores.get('Overall Score', 0),
'column_shapes': scores.get('Column Shapes', 0),
'column_pair_trends': scores.get('Column Pair Trends', 0)
}
# Relational synthetic data
from sdv.relational import HMA1
class RelationalSynthesizer:
"""Generate synthetic data for relational databases"""
def __init__(self):
self.model = HMA1()
def fit(self, tables: Dict[str, pd.DataFrame], metadata: dict):
"""Fit to relational schema"""
self.model.fit(tables, metadata)
return self
def generate(self, scale: float = 1.0) -> Dict[str, pd.DataFrame]:
"""Generate synthetic tables maintaining relationships"""
return self.model.sample(scale=scale)
# Example metadata for relational data
relational_metadata = {
"tables": {
"customers": {
"primary_key": "customer_id",
"fields": {
"customer_id": {"type": "id"},
"name": {"type": "categorical"},
"email": {"type": "email"},
"signup_date": {"type": "datetime"}
}
},
"orders": {
"primary_key": "order_id",
"fields": {
"order_id": {"type": "id"},
"customer_id": {"type": "id"},
"order_date": {"type": "datetime"},
"total": {"type": "numerical", "subtype": "float"}
}
}
},
"relationships": [
{
"parent": "customers",
"child": "orders",
"foreign_key": "customer_id"
}
]
}
Synthetic Data Quality Assessment
from scipy.stats import ks_2samp
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
class SyntheticDataEvaluator:
"""Evaluate quality of synthetic data"""
def statistical_similarity(
self,
real_df: pd.DataFrame,
synthetic_df: pd.DataFrame
) -> dict:
"""Compare statistical properties"""
results = {}
for col in real_df.columns:
if real_df[col].dtype in ['int64', 'float64']:
# KS test for numeric columns
stat, p_value = ks_2samp(
real_df[col].dropna(),
synthetic_df[col].dropna()
)
results[col] = {
'ks_statistic': stat,
'p_value': p_value,
'similar': p_value > 0.05
}
else:
# Chi-square for categorical
real_dist = real_df[col].value_counts(normalize=True)
synth_dist = synthetic_df[col].value_counts(normalize=True)
# Jensen-Shannon divergence
all_categories = set(real_dist.index) | set(synth_dist.index)
real_probs = [real_dist.get(c, 0) for c in all_categories]
synth_probs = [synth_dist.get(c, 0) for c in all_categories]
from scipy.spatial.distance import jensenshannon
js_div = jensenshannon(real_probs, synth_probs)
results[col] = {
'js_divergence': js_div,
'similar': js_div < 0.1
}
return results
def discriminator_test(
self,
real_df: pd.DataFrame,
synthetic_df: pd.DataFrame
) -> float:
"""Test if a classifier can distinguish real from synthetic"""
# Prepare data
real_df = real_df.copy()
synthetic_df = synthetic_df.copy()
real_df['is_real'] = 1
synthetic_df['is_real'] = 0
combined = pd.concat([real_df, synthetic_df])
# Encode categorical columns
for col in combined.select_dtypes(include=['object', 'category']).columns:
combined[col] = combined[col].astype('category').cat.codes
combined = combined.fillna(-999)
X = combined.drop('is_real', axis=1)
y = combined['is_real']
# Train classifier
clf = RandomForestClassifier(n_estimators=100, random_state=42)
scores = cross_val_score(clf, X, y, cv=5, scoring='roc_auc')
# Score close to 0.5 means synthetic is indistinguishable
return scores.mean()
def privacy_check(
self,
real_df: pd.DataFrame,
synthetic_df: pd.DataFrame,
key_columns: List[str]
) -> dict:
"""Check for privacy leakage"""
# Check for exact matches
merged = pd.merge(
synthetic_df,
real_df,
on=key_columns,
how='inner'
)
exact_matches = len(merged)
match_rate = exact_matches / len(synthetic_df)
return {
'exact_matches': exact_matches,
'match_rate': match_rate,
'privacy_safe': match_rate < 0.01
}
Synthetic Data Use Cases
- ML Training: Augment limited datasets
- Testing: Generate test data for applications
- Privacy: Share data without exposing PII
- Demo: Create realistic demo environments
- Research: Enable research on sensitive data
Synthetic data in 2021 became practical for production use. The key is validating that synthetic data maintains the statistical properties needed for your use case.