7 min read
Carbon-Aware Computing: Building Climate-Conscious Applications
Carbon-aware computing takes sustainability beyond efficiency. It’s about making real-time decisions based on the carbon intensity of the electrical grid. Let’s explore how to build applications that respond to grid conditions.
Understanding Grid Carbon Intensity
Electricity grids mix various energy sources. Carbon intensity varies by:
- Time of day (solar peaks midday)
- Weather (wind varies)
- Demand (peak hours often use dirtier peaker plants)
- Season (heating/cooling loads)
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
import httpx
@dataclass
class CarbonIntensityData:
timestamp: datetime
intensity_g_co2_kwh: float
is_low_carbon: bool
energy_mix: dict
class CarbonIntensityProvider:
"""Interface with carbon intensity data providers"""
def __init__(self, provider: str = "electricitymap"):
self.provider = provider
self.base_urls = {
"electricitymap": "https://api.electricitymap.org/v3",
"watttime": "https://api2.watttime.org/v2"
}
async def get_current_intensity(self, region: str) -> CarbonIntensityData:
"""Get current carbon intensity for a region"""
async with httpx.AsyncClient() as client:
if self.provider == "electricitymap":
response = await client.get(
f"{self.base_urls['electricitymap']}/carbon-intensity/latest",
params={"zone": region},
headers={"auth-token": self.api_key}
)
data = response.json()
return CarbonIntensityData(
timestamp=datetime.fromisoformat(data["datetime"]),
intensity_g_co2_kwh=data["carbonIntensity"],
is_low_carbon=data["carbonIntensity"] < 200,
energy_mix=data.get("powerBreakdown", {})
)
async def get_forecast(
self,
region: str,
hours: int = 24
) -> List[CarbonIntensityData]:
"""Get carbon intensity forecast"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_urls['electricitymap']}/carbon-intensity/forecast",
params={"zone": region},
headers={"auth-token": self.api_key}
)
data = response.json()
forecasts = []
for point in data["forecast"][:hours]:
forecasts.append(CarbonIntensityData(
timestamp=datetime.fromisoformat(point["datetime"]),
intensity_g_co2_kwh=point["carbonIntensity"],
is_low_carbon=point["carbonIntensity"] < 200,
energy_mix={}
))
return forecasts
def find_optimal_window(
self,
forecasts: List[CarbonIntensityData],
duration_hours: int,
deadline: Optional[datetime] = None
) -> Optional[datetime]:
"""Find the optimal time window for lowest carbon intensity"""
if deadline:
forecasts = [f for f in forecasts if f.timestamp <= deadline]
if len(forecasts) < duration_hours:
return None
# Sliding window to find lowest average intensity
best_start = None
best_avg_intensity = float('inf')
for i in range(len(forecasts) - duration_hours + 1):
window = forecasts[i:i + duration_hours]
avg_intensity = sum(f.intensity_g_co2_kwh for f in window) / len(window)
if avg_intensity < best_avg_intensity:
best_avg_intensity = avg_intensity
best_start = window[0].timestamp
return best_start
Carbon-Aware Kubernetes Scheduler
# Carbon-aware pod scheduling
apiVersion: v1
kind: ConfigMap
metadata:
name: carbon-aware-config
namespace: kube-system
data:
config.yaml: |
carbonIntensityProvider: electricitymap
regions:
- name: australiaeast
zone: AU-NSW
priority: 3
- name: northeurope
zone: IE
priority: 1
- name: swedencentral
zone: SE
priority: 1
thresholds:
lowCarbon: 100
mediumCarbon: 300
highCarbon: 500
scheduling:
enableDeferral: true
maxDeferralHours: 6
preferLowCarbonRegions: true
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: carbon-aware-scheduler
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: carbon-aware-scheduler
template:
spec:
containers:
- name: scheduler
image: carbonaware/scheduler:latest
env:
- name: ELECTRICITY_MAP_TOKEN
valueFrom:
secretKeyRef:
name: carbon-api-secrets
key: electricitymap-token
// Carbon-aware scheduling logic
package scheduler
import (
"context"
"sort"
"time"
v1 "k8s.io/api/core/v1"
)
type CarbonAwareScheduler struct {
carbonProvider CarbonIntensityProvider
regions []RegionConfig
}
func (s *CarbonAwareScheduler) Schedule(ctx context.Context, pod *v1.Pod) (*SchedulingDecision, error) {
// Check if pod is carbon-aware
if !s.isCarbonAwarePod(pod) {
return nil, nil // Use default scheduler
}
// Get carbon intensity for all available regions
intensities := make([]RegionIntensity, 0)
for _, region := range s.regions {
intensity, err := s.carbonProvider.GetCurrentIntensity(region.Zone)
if err != nil {
continue
}
intensities = append(intensities, RegionIntensity{
Region: region,
Intensity: intensity,
})
}
// Sort by carbon intensity
sort.Slice(intensities, func(i, j int) bool {
return intensities[i].Intensity < intensities[j].Intensity
})
// Check if we should defer
if s.canDefer(pod) {
forecast, _ := s.carbonProvider.GetForecast(intensities[0].Region.Zone, 24)
optimalTime := s.findOptimalWindow(forecast, pod)
if optimalTime != nil && optimalTime.After(time.Now().Add(time.Hour)) {
return &SchedulingDecision{
Action: Defer,
ScheduledTime: *optimalTime,
Reason: "Deferring to lower carbon period",
}, nil
}
}
// Select lowest carbon region that meets constraints
for _, ri := range intensities {
if s.meetsConstraints(pod, ri.Region) {
return &SchedulingDecision{
Action: Schedule,
Region: ri.Region.Name,
Node: s.selectNode(ri.Region, pod),
Reason: fmt.Sprintf("Lowest carbon region: %d g/kWh", ri.Intensity),
}, nil
}
}
return nil, fmt.Errorf("no suitable region found")
}
func (s *CarbonAwareScheduler) isCarbonAwarePod(pod *v1.Pod) bool {
annotations := pod.GetAnnotations()
return annotations["carbon-aware"] == "true"
}
func (s *CarbonAwareScheduler) canDefer(pod *v1.Pod) bool {
annotations := pod.GetAnnotations()
return annotations["carbon-aware.deferrable"] == "true"
}
Demand Shifting Implementation
// Shift workloads to low-carbon periods
using Azure.Messaging.ServiceBus;
using System.Text.Json;
public class CarbonAwareWorkloadManager
{
private readonly CarbonIntensityService _carbonService;
private readonly ServiceBusClient _serviceBusClient;
private readonly ILogger<CarbonAwareWorkloadManager> _logger;
public async Task<WorkloadDecision> ProcessWorkload(Workload workload)
{
var currentIntensity = await _carbonService.GetCurrentIntensity(workload.Region);
// Immediate execution for low carbon periods
if (currentIntensity.IsLowCarbon)
{
_logger.LogInformation(
"Executing {WorkloadId} immediately - low carbon period ({Intensity} g/kWh)",
workload.Id, currentIntensity.Intensity);
return new WorkloadDecision
{
Action = WorkloadAction.ExecuteNow,
CarbonIntensity = currentIntensity.Intensity
};
}
// Check if workload can be deferred
if (workload.IsDeferrable && workload.Deadline > DateTime.UtcNow.AddHours(2))
{
var forecast = await _carbonService.GetForecast(workload.Region, 24);
var optimalTime = FindOptimalWindow(forecast, workload.Deadline);
if (optimalTime.HasValue)
{
var deferredIntensity = forecast
.First(f => f.Timestamp == optimalTime.Value)
.Intensity;
var savingsPercent = (currentIntensity.Intensity - deferredIntensity) /
currentIntensity.Intensity * 100;
if (savingsPercent > 20) // Only defer if significant savings
{
_logger.LogInformation(
"Deferring {WorkloadId} to {Time} for {Savings}% carbon reduction",
workload.Id, optimalTime.Value, savingsPercent);
// Schedule for later using Service Bus
await ScheduleDeferredWorkload(workload, optimalTime.Value);
return new WorkloadDecision
{
Action = WorkloadAction.Deferred,
ScheduledTime = optimalTime.Value,
CurrentIntensity = currentIntensity.Intensity,
DeferredIntensity = deferredIntensity,
CarbonSavingsPercent = savingsPercent
};
}
}
}
// Execute now if can't defer or no better window
return new WorkloadDecision
{
Action = WorkloadAction.ExecuteNow,
CarbonIntensity = currentIntensity.Intensity,
Reason = "No better carbon window available"
};
}
private async Task ScheduleDeferredWorkload(Workload workload, DateTime scheduledTime)
{
var sender = _serviceBusClient.CreateSender("deferred-workloads");
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(workload))
{
ScheduledEnqueueTime = scheduledTime,
ApplicationProperties =
{
["WorkloadId"] = workload.Id,
["OriginalSubmitTime"] = DateTime.UtcNow.ToString("O"),
["DeferralReason"] = "carbon-optimization"
}
};
await sender.SendMessageAsync(message);
}
}
Carbon Metrics and Reporting
# Carbon metrics collection
from prometheus_client import Counter, Gauge, Histogram
import time
# Define metrics
carbon_intensity_gauge = Gauge(
'carbon_intensity_g_kwh',
'Current carbon intensity',
['region']
)
workload_carbon_counter = Counter(
'workload_carbon_emissions_g',
'Total carbon emissions from workloads',
['workload_type', 'region']
)
carbon_savings_counter = Counter(
'carbon_savings_g',
'Carbon saved through optimization',
['optimization_type']
)
deferral_histogram = Histogram(
'workload_deferral_hours',
'Hours workloads were deferred for carbon optimization',
buckets=[0.5, 1, 2, 4, 8, 12, 24]
)
class CarbonMetricsCollector:
def __init__(self, carbon_provider):
self.carbon_provider = carbon_provider
async def collect_metrics(self, regions: list):
"""Collect carbon metrics for all regions"""
for region in regions:
intensity = await self.carbon_provider.get_current_intensity(region)
carbon_intensity_gauge.labels(region=region).set(intensity.intensity_g_co2_kwh)
def record_workload_emissions(
self,
workload_type: str,
region: str,
energy_kwh: float,
intensity_g_kwh: float
):
"""Record emissions for a completed workload"""
emissions = energy_kwh * intensity_g_kwh
workload_carbon_counter.labels(
workload_type=workload_type,
region=region
).inc(emissions)
def record_carbon_savings(
self,
optimization_type: str,
baseline_emissions_g: float,
actual_emissions_g: float
):
"""Record carbon savings from optimization"""
savings = baseline_emissions_g - actual_emissions_g
if savings > 0:
carbon_savings_counter.labels(
optimization_type=optimization_type
).inc(savings)
def record_deferral(self, deferral_hours: float):
"""Record workload deferral duration"""
deferral_histogram.observe(deferral_hours)
# Dashboard query examples (PromQL)
"""
# Total carbon emissions per day
sum(increase(workload_carbon_emissions_g[24h])) / 1000
# Carbon savings from demand shifting
sum(increase(carbon_savings_g{optimization_type="demand_shifting"}[24h])) / 1000
# Average deferral time
histogram_quantile(0.5, rate(workload_deferral_hours_bucket[24h]))
# Emissions by region
sum by (region) (rate(workload_carbon_emissions_g[1h])) / 1000
"""
Green Software Principles
# Green software development guidelines
guidelines:
efficiency:
- Optimize algorithms for energy efficiency
- Reduce unnecessary computation
- Cache aggressively to avoid recomputation
- Use efficient data structures
demand_shifting:
- Identify deferrable workloads
- Integrate carbon intensity data
- Implement intelligent scheduling
- Set appropriate deadlines
hardware_efficiency:
- Use servers with high efficiency ratings
- Leverage specialized hardware (GPUs for AI)
- Right-size resources continuously
- Prefer ARM-based instances where possible
carbon_awareness:
- Monitor carbon intensity in real-time
- Make carbon-aware deployment decisions
- Implement geographic load balancing
- Report on carbon metrics
measurement:
- Track energy consumption
- Calculate carbon emissions
- Report sustainability metrics
- Set reduction targets
Key Carbon-Aware Patterns
- Demand Shifting: Move flexible workloads to low-carbon periods
- Geographic Shifting: Route to regions with cleaner grids
- Demand Shaping: Adjust resource consumption based on carbon intensity
- Carbon-Aware Scaling: Scale more aggressively during clean periods
Carbon-aware computing in 2021 moved from research concept to practical implementation. The Green Software Foundation and tools like Carbon Aware SDK are making it accessible to all developers.