Bulk Data Extracts
This tutorial demonstrates how to combine bulk forecast data extracts with live API data for backtesting and production optimization workflows. This approach is essential when you need consistent model performance across historical analysis and live trading applications.
Many power market participants use price forecasts as inputs for downstream activities like in-house optimizations, asset management policies, and trading strategies. For backtesting purposes, it's crucial to use forecasts from a specific machine learning model to ensure consistency between historical analysis and production deployment.
Overview
Enertel provides bulk data extracts as part of paid subscriptions, allowing users to:
- Train decision layers using our price forecasts as inputs
- Maintain model consistency between backtesting and production
- Access historical data without API rate limits
- Combine historical and live data seamlessly
We're developing a streamlined bulk download feature that will eliminate the need for data stitching. Stay tuned for updates!
Prerequisites
- Bulk data extract: Request from the Enertel team (typically takes 2-3 business days)
- API token: For accessing live forecast data
- Python environment: With
pandas,requests, andarrowlibraries
Requesting Bulk Extracts
Contact the Enertel team to request a bulk data extract. Provide:
- Date range: Historical period you need
- Price nodes: Specific locations of interest
- Data series: DALMP, RTLMP, or other series
- Model ID: Specific model for consistency (optional)
- Format preference: CSV or Parquet
Processing typically takes 2-3 business days, after which you'll receive a secure download link.
Data Validation
Before combining bulk and live data, validate your extract to ensure data quality:
import pandas as pd
import requests
import arrow
from datetime import datetime, timedelta
# Load your bulk data extract
df = pd.read_csv('<path_to_your_extract_file>')
print(f"Loaded {len(df)} rows from bulk extract")
print(f"Date range: {df['scheduled_at'].min()} to {df['scheduled_at'].max()}")
# Validate model consistency
model_ids = df['model_id'].unique().tolist()
print(f"Model IDs in extract: {model_ids}")
# Ensure single model (recommended for consistency)
if len(model_ids) == 1:
print(f"✓ Single model ID confirmed: {model_ids[0]}")
model_id = model_ids[0]
else:
print(f"⚠ Multiple model IDs found. Consider filtering to one model for consistency.")
model_id = model_ids[0] # Use first model
# Check for missing data
missing_data = df.isnull().sum().sum()
if missing_data == 0:
print("✓ No missing values found")
else:
print(f"⚠ Found {missing_data} missing values")
# Check for duplicates
duplicate_cols = ['timestamp', 'feature_id', 'scheduled_at', 'model_id']
duplicates = df.duplicated(subset=duplicate_cols).sum()
if duplicates == 0:
print("✓ No duplicate forecasts found")
else:
print(f"⚠ Found {duplicates} duplicate forecasts")
# Validate data completeness
print(f"\nData summary:")
print(f"Price nodes: {df['object_name'].nunique()}")
print(f"Data series: {df['series_name'].unique()}")
print(f"Date range: {df['scheduled_at'].min()} to {df['scheduled_at'].max()}")
Accessing Live Data
Now we'll query for live forecast data to extend our historical dataset:
def get_live_forecasts(token, model_id, start_date, end_date):
"""
Fetch live forecasts from the dashboard API for a specific model.
"""
base_url = "https://app.enertel.ai/api"
headers = {"Authorization": f"Bearer {token}"}
# Calculate date ranges (API limited to ~2 week increments)
current_date = arrow.get(start_date)
end_date = arrow.get(end_date)
ranges = []
while current_date < end_date:
range_end = min(current_date.shift(days=14), end_date)
ranges.append((
current_date.format('YYYY-MM-DDTHH:mm:ss'),
range_end.format('YYYY-MM-DDTHH:mm:ss')
))
current_date = range_end
print(f"Will make {len(ranges)} API calls to cover date range")
all_data = []
for i, (start, end) in enumerate(ranges):
print(f"Fetching range {i+1}/{len(ranges)}: {start[:10]} to {end[:10]}")
params = {
"start": start,
"end": end,
"models": str(model_id) # Specify the same model as bulk data
}
try:
response = requests.get(
f"{base_url}/dashboard/forecasts",
headers=headers,
params=params,
timeout=30
)
if response.status_code == 200:
batch_data = response.json()
parsed_data = parse_dashboard_forecasts(batch_data)
all_data.append(parsed_data)
print(f" Retrieved {len(parsed_data)} forecast points")
else:
print(f" Error: {response.status_code} - {response.text}")
except requests.exceptions.RequestException as e:
print(f" Request failed: {e}")
if all_data:
return pd.concat(all_data, ignore_index=True)
else:
return pd.DataFrame()
def parse_dashboard_forecasts(response_data):
"""
Parse nested dashboard API response into flat DataFrame.
"""
forecasts = []
for obj in response_data:
for target in obj["targets"]:
for vintage in target["vintages"]:
for batch in vintage["batches"]:
for forecast in batch["forecasts"]:
forecasts.append({
"object_name": obj["object_name"],
"series_name": target.get("series_name", ""),
"target_id": target["target_id"],
"target_description": target["description"],
"scheduled_at": vintage["scheduled_at"],
"batch_id": batch["batch_id"],
"model_id": batch["model_id"],
"feature_id": batch["feature_id"],
**forecast
})
return pd.DataFrame(forecasts)
# Example usage
token = '<your-api-token>'
# Start from the end of bulk data, go to current date
bulk_end_date = df['scheduled_at'].max()
current_date = arrow.now().floor('day')
print(f"Fetching live data from {bulk_end_date} to {current_date.format('YYYY-MM-DD')}")
live_df = get_live_forecasts(
token=token,
model_id=model_id,
start_date=bulk_end_date,
end_date=current_date.format('YYYY-MM-DD')
)
print(f"Retrieved {len(live_df)} live forecast points")
Combining Historical and Live Data
Now combine the bulk extract with live API data:
def combine_bulk_and_live_data(bulk_df, live_df, model_id):
"""
Combine bulk extract with live API data, ensuring consistency.
"""
print("Combining bulk and live data...")
# Filter live data to same model (if multiple models returned)
if not live_df.empty:
live_df = live_df[live_df['model_id'] == model_id]
print(f"Filtered live data to model {model_id}: {len(live_df)} points")
# Ensure consistent column structure
bulk_columns = set(bulk_df.columns)
live_columns = set(live_df.columns) if not live_df.empty else set()
# Find common columns
common_columns = bulk_columns.intersection(live_columns)
if live_df.empty:
common_columns = bulk_columns
print(f"Using {len(common_columns)} common columns")
# Select common columns and combine
if not live_df.empty:
combined_df = pd.concat([
bulk_df[list(common_columns)],
live_df[list(common_columns)]
], ignore_index=True)
else:
combined_df = bulk_df.copy()
# Validate combined dataset
print(f"\nCombined dataset validation:")
print(f"Total rows: {len(combined_df)}")
# Check for duplicates
duplicate_cols = ['timestamp', 'feature_id', 'scheduled_at', 'model_id']
duplicates = combined_df.duplicated(subset=duplicate_cols).sum()
print(f"Duplicate rows: {duplicates}")
# Remove duplicates if found
if duplicates > 0:
combined_df = combined_df.drop_duplicates(subset=duplicate_cols)
print(f"Removed duplicates, final rows: {len(combined_df)}")
# Verify model consistency
unique_models = combined_df['model_id'].nunique()
print(f"Unique model IDs: {unique_models}")
# Date range
print(f"Date range: {combined_df['scheduled_at'].min()} to {combined_df['scheduled_at'].max()}")
return combined_df
# Combine the datasets
combined_df = combine_bulk_and_live_data(df, live_df, model_id)
Data Quality Checks
Perform final validation on your combined dataset:
def validate_combined_dataset(df):
"""
Comprehensive validation of combined forecast dataset.
"""
print("=== Dataset Validation Report ===")
# Basic statistics
print(f"Total forecast points: {len(df):,}")
print(f"Date range: {df['scheduled_at'].min()} to {df['scheduled_at'].max()}")
print(f"Price nodes: {df['object_name'].nunique()}")
print(f"Data series: {', '.join(df['series_name'].unique())}")
# Model consistency
model_ids = df['model_id'].unique()
print(f"Model IDs: {model_ids}")
if len(model_ids) == 1:
print("✓ Single model ID - consistent for backtesting")
else:
print("⚠ Multiple model IDs - may affect backtesting consistency")
# Data completeness
missing_by_column = df.isnull().sum()
if missing_by_column.sum() == 0:
print("✓ No missing values")
else:
print("⚠ Missing values found:")
print(missing_by_column[missing_by_column > 0])
# Duplicate check
duplicate_cols = ['timestamp', 'feature_id', 'scheduled_at', 'model_id']
duplicates = df.duplicated(subset=duplicate_cols).sum()
if duplicates == 0:
print("✓ No duplicate forecasts")
else:
print(f"⚠ {duplicates} duplicate forecasts found")
# Forecast distribution check
print(f"\nForecast statistics (p50):")
print(df['p50'].describe())
# Check for unrealistic values
if (df['p50'] < 0).any():
print("⚠ Negative prices found")
if (df['p50'] > 1000).any():
print("⚠ Very high prices found (>$1000/MWh)")
# Temporal coverage
df['scheduled_at'] = pd.to_datetime(df['scheduled_at'])
date_gaps = df.groupby('object_name')['scheduled_at'].apply(
lambda x: (x.max() - x.min()).days
)
print(f"\nTemporal coverage by price node (days):")
print(date_gaps.describe())
return df
# Validate final dataset
final_df = validate_combined_dataset(combined_df)
Advanced Usage Examples
Backtesting Framework
def prepare_for_backtesting(df, lookback_hours=24):
"""
Prepare dataset for backtesting with proper time alignment.
"""
# Ensure datetime types
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['scheduled_at'] = pd.to_datetime(df['scheduled_at'])
# Calculate operating hour offset
df['operating_hour_offset'] = (
df['timestamp'] - df['scheduled_at']
).dt.total_seconds() / 3600
# Filter for specific forecast horizons (e.g., day-ahead)
day_ahead_df = df[
(df['operating_hour_offset'] >= 12) &
(df['operating_hour_offset'] <= 36)
].copy()
# Sort for time series analysis
day_ahead_df = day_ahead_df.sort_values(['object_name', 'timestamp', 'scheduled_at'])
print(f"Prepared {len(day_ahead_df)} day-ahead forecasts for backtesting")
return day_ahead_df
# Prepare backtesting dataset
backtest_df = prepare_for_backtesting(final_df)
Export for External Systems
def export_combined_dataset(df, format='parquet', filename_prefix='enertel_combined'):
"""
Export combined dataset in various formats.
"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M')
if format == 'parquet':
filename = f"{filename_prefix}_{timestamp}.parquet"
df.to_parquet(filename, index=False)
elif format == 'csv':
filename = f"{filename_prefix}_{timestamp}.csv"
df.to_csv(filename, index=False)
elif format == 'hdf':
filename = f"{filename_prefix}_{timestamp}.h5"
df.to_hdf(filename, key='forecasts', mode='w', index=False)
print(f"Exported {len(df)} rows to {filename}")
print(f"File size: {os.path.getsize(filename) / (1024**2):.1f} MB")
return filename
# Export final dataset
# export_file = export_combined_dataset(final_df, format='parquet')
Best Practices
Performance Optimization
# For large datasets, consider using categorical data types
categorical_columns = ['object_name', 'series_name', 'iso']
for col in categorical_columns:
if col in final_df.columns:
final_df[col] = final_df[col].astype('category')
# Use efficient data types for numeric columns
numeric_optimizations = {
'model_id': 'int32',
'target_id': 'int32',
'feature_id': 'int32',
'batch_id': 'int64'
}
for col, dtype in numeric_optimizations.items():
if col in final_df.columns:
final_df[col] = final_df[col].astype(dtype)
Data Lineage Tracking
# Add metadata to track data sources
final_df['data_source'] = 'bulk'
if not live_df.empty:
# Mark live data points
live_start_date = pd.to_datetime(bulk_end_date)
final_df.loc[
pd.to_datetime(final_df['scheduled_at']) > live_start_date,
'data_source'
] = 'live_api'
print("Data source distribution:")
print(final_df['data_source'].value_counts())
Conclusion
This tutorial demonstrates how to successfully combine Enertel bulk data extracts with live API data to create a seamless dataset for backtesting and production use. Key benefits include:
- Model consistency: Use the same model across historical and live data
- Seamless integration: No gaps between bulk and live data
- Quality assurance: Comprehensive validation ensures data integrity
- Scalability: Efficient data structures for large-scale analysis
When requesting bulk extracts, specify the same model IDs you plan to use in production. This ensures consistency between backtesting results and live trading performance.
Always specify model IDs in your API calls to ensure you receive forecasts from the same model used in your bulk extract. Without this specification, you'll receive forecasts from our current "best" model, which may change over time.
More information: Dashboard Forecasts API Documentation