Spaces:
Sleeping
Sleeping
| """Data loading utilities for FBMC forecasting project. | |
| Provides convenient functions to load and filter FBMC data files. | |
| """ | |
| import polars as pl | |
| from pathlib import Path | |
| from typing import Optional, List | |
| from datetime import datetime, timedelta | |
| class FBMCDataLoader: | |
| """Load and filter FBMC data with convenient methods.""" | |
| def __init__(self, data_dir: Path = Path("data/raw")): | |
| """Initialize data loader. | |
| Args: | |
| data_dir: Directory containing Parquet files (default: data/raw) | |
| """ | |
| self.data_dir = Path(data_dir) | |
| if not self.data_dir.exists(): | |
| raise FileNotFoundError(f"Data directory not found: {data_dir}") | |
| def load_cnecs( | |
| self, | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| borders: Optional[List[str]] = None | |
| ) -> pl.DataFrame: | |
| """Load CNEC data with optional filtering. | |
| Args: | |
| start_date: Start date (ISO format: 'YYYY-MM-DD') | |
| end_date: End date (ISO format: 'YYYY-MM-DD') | |
| borders: List of border codes to filter (e.g., ['DE_NL', 'DE_FR']) | |
| Returns: | |
| Polars DataFrame with CNEC data | |
| """ | |
| file_path = self.data_dir / "cnecs_2024_2025.parquet" | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"CNECs file not found: {file_path}") | |
| cnecs = pl.read_parquet(file_path) | |
| # Apply date filters | |
| if start_date: | |
| cnecs = cnecs.filter(pl.col("timestamp") >= start_date) | |
| if end_date: | |
| cnecs = cnecs.filter(pl.col("timestamp") <= end_date) | |
| # Apply border filter | |
| if borders: | |
| cnecs = cnecs.filter(pl.col("border").is_in(borders)) | |
| return cnecs | |
| def load_weather( | |
| self, | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| grid_points: Optional[List[str]] = None | |
| ) -> pl.DataFrame: | |
| """Load weather data with optional filtering. | |
| Args: | |
| start_date: Start date (ISO format: 'YYYY-MM-DD') | |
| end_date: End date (ISO format: 'YYYY-MM-DD') | |
| grid_points: List of grid point IDs to filter | |
| Returns: | |
| Polars DataFrame with weather data | |
| """ | |
| file_path = self.data_dir / "weather_2024_2025.parquet" | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Weather file not found: {file_path}") | |
| weather = pl.read_parquet(file_path) | |
| # Apply date filters | |
| if start_date: | |
| weather = weather.filter(pl.col("timestamp") >= start_date) | |
| if end_date: | |
| weather = weather.filter(pl.col("timestamp") <= end_date) | |
| # Apply grid point filter | |
| if grid_points: | |
| weather = weather.filter(pl.col("grid_point").is_in(grid_points)) | |
| return weather | |
| def load_entsoe( | |
| self, | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| zones: Optional[List[str]] = None | |
| ) -> pl.DataFrame: | |
| """Load ENTSO-E data with optional filtering. | |
| Args: | |
| start_date: Start date (ISO format: 'YYYY-MM-DD') | |
| end_date: End date (ISO format: 'YYYY-MM-DD') | |
| zones: List of bidding zone codes (e.g., ['DE_LU', 'FR', 'NL']) | |
| Returns: | |
| Polars DataFrame with ENTSO-E data | |
| """ | |
| file_path = self.data_dir / "entsoe_2024_2025.parquet" | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"ENTSO-E file not found: {file_path}") | |
| entsoe = pl.read_parquet(file_path) | |
| # Apply date filters | |
| if start_date: | |
| entsoe = entsoe.filter(pl.col("timestamp") >= start_date) | |
| if end_date: | |
| entsoe = entsoe.filter(pl.col("timestamp") <= end_date) | |
| # Apply zone filter | |
| if zones: | |
| entsoe = entsoe.filter(pl.col("zone").is_in(zones)) | |
| return entsoe | |
| def get_date_range(self) -> dict: | |
| """Get available date range from all datasets. | |
| Returns: | |
| Dictionary with min/max dates for each dataset | |
| """ | |
| date_ranges = {} | |
| try: | |
| cnecs = pl.read_parquet(self.data_dir / "cnecs_2024_2025.parquet") | |
| date_ranges['cnecs'] = { | |
| 'min': cnecs['timestamp'].min(), | |
| 'max': cnecs['timestamp'].max() | |
| } | |
| except Exception: | |
| date_ranges['cnecs'] = None | |
| try: | |
| weather = pl.read_parquet(self.data_dir / "weather_2024_2025.parquet") | |
| date_ranges['weather'] = { | |
| 'min': weather['timestamp'].min(), | |
| 'max': weather['timestamp'].max() | |
| } | |
| except Exception: | |
| date_ranges['weather'] = None | |
| try: | |
| entsoe = pl.read_parquet(self.data_dir / "entsoe_2024_2025.parquet") | |
| date_ranges['entsoe'] = { | |
| 'min': entsoe['timestamp'].min(), | |
| 'max': entsoe['timestamp'].max() | |
| } | |
| except Exception: | |
| date_ranges['entsoe'] = None | |
| return date_ranges | |
| def validate_data_completeness( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| max_missing_pct: float = 5.0 | |
| ) -> dict: | |
| """Validate data completeness for a given date range. | |
| Args: | |
| start_date: Start date (ISO format) | |
| end_date: End date (ISO format) | |
| max_missing_pct: Maximum acceptable missing data percentage | |
| Returns: | |
| Dictionary with validation results for each dataset | |
| """ | |
| results = {} | |
| # Calculate expected number of hours | |
| start_dt = datetime.fromisoformat(start_date) | |
| end_dt = datetime.fromisoformat(end_date) | |
| expected_hours = int((end_dt - start_dt).total_seconds() / 3600) | |
| # Validate CNECs | |
| try: | |
| cnecs = self.load_cnecs(start_date, end_date) | |
| actual_hours = cnecs.select(pl.col("timestamp").n_unique()).item() | |
| missing_pct = (1 - actual_hours / expected_hours) * 100 | |
| results['cnecs'] = { | |
| 'expected_hours': expected_hours, | |
| 'actual_hours': actual_hours, | |
| 'missing_pct': missing_pct, | |
| 'valid': missing_pct <= max_missing_pct | |
| } | |
| except Exception as e: | |
| results['cnecs'] = {'error': str(e), 'valid': False} | |
| # Validate weather | |
| try: | |
| weather = self.load_weather(start_date, end_date) | |
| actual_hours = weather.select(pl.col("timestamp").n_unique()).item() | |
| missing_pct = (1 - actual_hours / expected_hours) * 100 | |
| results['weather'] = { | |
| 'expected_hours': expected_hours, | |
| 'actual_hours': actual_hours, | |
| 'missing_pct': missing_pct, | |
| 'valid': missing_pct <= max_missing_pct | |
| } | |
| except Exception as e: | |
| results['weather'] = {'error': str(e), 'valid': False} | |
| # Validate ENTSO-E | |
| try: | |
| entsoe = self.load_entsoe(start_date, end_date) | |
| actual_hours = entsoe.select(pl.col("timestamp").n_unique()).item() | |
| missing_pct = (1 - actual_hours / expected_hours) * 100 | |
| results['entsoe'] = { | |
| 'expected_hours': expected_hours, | |
| 'actual_hours': actual_hours, | |
| 'missing_pct': missing_pct, | |
| 'valid': missing_pct <= max_missing_pct | |
| } | |
| except Exception as e: | |
| results['entsoe'] = {'error': str(e), 'valid': False} | |
| return results | |
| # Example usage | |
| if __name__ == "__main__": | |
| # Initialize loader | |
| loader = FBMCDataLoader(data_dir=Path("data/raw")) | |
| # Check available date ranges | |
| print("Available date ranges:") | |
| date_ranges = loader.get_date_range() | |
| for dataset, ranges in date_ranges.items(): | |
| if ranges: | |
| print(f" {dataset}: {ranges['min']} to {ranges['max']}") | |
| else: | |
| print(f" {dataset}: Not available") | |
| # Load specific data | |
| # cnecs = loader.load_cnecs(start_date="2024-10-01", end_date="2024-10-31") | |
| # weather = loader.load_weather(start_date="2024-10-01", end_date="2024-10-31") | |