File size: 8,525 Bytes
d4939ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""
CNEC Border Extraction Utility
================================

Extracts commercial border information from CNEC EIC codes, TSO fields,
and PTDF profiles using a hierarchical approach.

Strategy:
1. Parse EIC codes (10T-XX-YY-NNNNNN format) - Primary, 33% coverage
2. Special case mapping (Alegro CNECs) - 8 CNECs
3. TSO + neighbor PTDF analysis - Fallback, ~67% coverage
4. Manual review for remaining cases

Author: Claude + Evgueni Poloukarov
Date: 2025-11-08
"""

from typing import Dict, Optional


# TSO to Country/Zone Mapping
TSO_TO_ZONE: Dict[str, str] = {
    # Germany (4 TSOs)
    '50Hertz': 'DE',
    'Amprion': 'DE',
    'TennetGmbh': 'DE',
    'TransnetBw': 'DE',

    # Other countries
    'Rte': 'FR',              # France
    'Elia': 'BE',             # Belgium
    'TennetBv': 'NL',         # Netherlands
    'Apg': 'AT',              # Austria
    'Ceps': 'CZ',             # Czech Republic
    'Pse': 'PL',              # Poland
    'Mavir': 'HU',            # Hungary
    'Seps': 'SK',             # Slovakia
    'Transelectrica': 'RO',   # Romania
    'Hops': 'HR',             # Croatia
    'Eles': 'SI',             # Slovenia
}


# FBMC Border Neighbors (from ENTSO-E BORDERS list)
ZONE_NEIGHBORS: Dict[str, list] = {
    'DE': ['NL', 'FR', 'BE', 'AT', 'CZ', 'PL'],  # DE_LU treated as DE
    'FR': ['DE', 'BE', 'ES', 'CH'],              # ES/CH external but affect FBMC
    'AT': ['DE', 'CZ', 'HU', 'SI', 'CH'],
    'CZ': ['DE', 'AT', 'SK', 'PL'],
    'HU': ['AT', 'SK', 'RO', 'HR'],
    'SK': ['CZ', 'HU', 'PL'],
    'PL': ['DE', 'CZ', 'SK'],
    'RO': ['HU'],
    'HR': ['HU', 'SI'],
    'SI': ['AT', 'HR'],
    'BE': ['DE', 'FR', 'NL'],
    'NL': ['DE', 'BE'],
}


# Special case mappings (Alegro cable + edge cases)
SPECIAL_BORDER_MAPPING: Dict[str, str] = {
    # Alegro DC cable (Belgium - Germany)
    'ALEGRO_EXTERNAL_BE_IMPORT': 'BE_DE',
    'ALEGRO_EXTERNAL_DE_EXPORT': 'BE_DE',
    'ALEGRO_EXTERNAL_DE_IMPORT': 'BE_DE',
    'ALEGRO_EXTERNAL_BE_EXPORT': 'BE_DE',
    'ALEGRO_INTERNAL_DE_IMPORT': 'BE_DE',
    'ALEGRO_INTERNAL_BE_EXPORT': 'BE_DE',
    'ALEGRO_INTERNAL_BE_IMPORT': 'BE_DE',
    'ALEGRO_INTERNAL_DE_EXPORT': 'BE_DE',
}


def extract_border_from_eic(eic: str) -> Optional[str]:
    """
    Extract border from EIC code with 10T-XX-YY-NNNNNN format.

    This is the most reliable method as border is explicitly encoded.

    Args:
        eic: CNEC EIC code

    Returns:
        Border string (e.g., "DE_FR", "AT_SI") or None if not parseable

    Examples:
        >>> extract_border_from_eic("10T-DE-FR-000068")
        "DE_FR"
        >>> extract_border_from_eic("10T-AT-SI-00003P")
        "AT_SI"
        >>> extract_border_from_eic("17T0000000215642")
        None
    """
    if not eic.startswith('10T-'):
        return None

    parts = eic.split('-')
    if len(parts) < 3:
        return None

    zone1, zone2 = parts[1], parts[2]

    # Normalize to alphabetical order for consistency
    border = f"{min(zone1, zone2)}_{max(zone1, zone2)}"

    return border


def get_special_border(eic: str) -> Optional[str]:
    """
    Get border for special case CNECs (Alegro cable, etc.).

    Args:
        eic: CNEC EIC code

    Returns:
        Border string or None if not a special case
    """
    return SPECIAL_BORDER_MAPPING.get(eic)


def infer_border_from_tso_and_ptdf(
    tso: str,
    ptdf_dict: Dict[str, float]
) -> Optional[str]:
    """
    Infer border using TSO home zone + highest PTDF in neighbor zones.

    This is a fallback method when EIC doesn't encode border explicitly.
    Uses TSO to identify home country, then finds neighbor with highest
    |PTDF| value.

    Args:
        tso: TSO name (e.g., "Apg", "Rte", "Amprion")
        ptdf_dict: Dictionary of PTDF values
                   Format: {"ptdf_AT": -0.45, "ptdf_DE": 0.12, ...}

    Returns:
        Border string or None if cannot be determined

    Example:
        >>> ptdfs = {"ptdf_AT": -0.45, "ptdf_SI": 0.38, "ptdf_DE": 0.12}
        >>> infer_border_from_tso_and_ptdf("Apg", ptdfs)
        "AT_SI"  # Apg is Austrian TSO, SI has highest |PTDF| among neighbors
    """
    home_zone = TSO_TO_ZONE.get(tso)
    if not home_zone:
        return None

    neighbors = ZONE_NEIGHBORS.get(home_zone, [])
    if not neighbors:
        return None

    # Find neighbor with highest |PTDF|
    neighbor_ptdfs = {}
    for neighbor in neighbors:
        ptdf_key = f'ptdf_{neighbor}'
        if ptdf_key in ptdf_dict:
            neighbor_ptdfs[neighbor] = abs(ptdf_dict[ptdf_key])

    if not neighbor_ptdfs:
        return None

    # Get neighbor with maximum absolute PTDF
    max_neighbor = max(neighbor_ptdfs, key=neighbor_ptdfs.get)

    # Normalize border to alphabetical order
    border = f"{min(home_zone, max_neighbor)}_{max(home_zone, max_neighbor)}"

    return border


def extract_cnec_border(
    cnec_eic: str,
    tso: str,
    ptdf_dict: Optional[Dict[str, float]] = None
) -> str:
    """
    Extract border for a CNEC using hierarchical strategy.

    Tries methods in order:
    1. Parse EIC (10T-XX-YY format) - most reliable
    2. Special case mapping (Alegro, etc.)
    3. TSO + neighbor PTDF analysis - fallback
    4. Return "UNKNOWN" if all methods fail

    Args:
        cnec_eic: CNEC EIC code
        tso: TSO name
        ptdf_dict: Optional dictionary of PTDF values
                   Format: {"ptdf_AT": -0.45, "ptdf_BE": 0.12, ...}

    Returns:
        Border string (e.g., "DE_FR", "AT_SI") or "UNKNOWN"

    Examples:
        >>> extract_cnec_border("10T-DE-FR-000068", "Amprion")
        "DE_FR"

        >>> extract_cnec_border("ALEGRO_EXTERNAL_BE_IMPORT", "Elia")
        "BE_DE"

        >>> ptdfs = {"ptdf_AT": -0.45, "ptdf_SI": 0.38}
        >>> extract_cnec_border("17T0000000215642", "Apg", ptdfs)
        "AT_SI"
    """
    # Method 1: Parse EIC for 10T- pattern
    border = extract_border_from_eic(cnec_eic)
    if border:
        return border

    # Method 2: Special cases (Alegro)
    border = get_special_border(cnec_eic)
    if border:
        return border

    # Method 3: TSO + PTDF neighbor analysis
    if ptdf_dict:
        border = infer_border_from_tso_and_ptdf(tso, ptdf_dict)
        if border:
            return border

    # Method 4: TSO-only fallback (use first alphabetical neighbor)
    # This is very approximate but better than UNKNOWN
    home_zone = TSO_TO_ZONE.get(tso)
    if home_zone:
        neighbors = ZONE_NEIGHBORS.get(home_zone, [])
        if neighbors:
            # Use first alphabetical neighbor as guess
            first_neighbor = sorted(neighbors)[0]
            border = f"{min(home_zone, first_neighbor)}_{max(home_zone, first_neighbor)}"
            return border

    return "UNKNOWN"


def validate_border_assignment(
    border: str,
    ptdf_dict: Dict[str, float],
    threshold: float = 0.05
) -> bool:
    """
    Validate border assignment using PTDF sanity check.

    For a border XX_YY, at least one of ptdf_XX or ptdf_YY should have
    significant magnitude (|PTDF| > threshold).

    Args:
        border: Assigned border (e.g., "DE_FR")
        ptdf_dict: Dictionary of PTDF values
        threshold: Minimum |PTDF| to consider significant (default 0.05)

    Returns:
        True if validation passes, False otherwise

    Example:
        >>> validate_border_assignment("DE_FR", {"ptdf_DE": -0.42, "ptdf_FR": 0.38})
        True

        >>> validate_border_assignment("DE_FR", {"ptdf_DE": 0.01, "ptdf_FR": 0.02})
        False
    """
    if border == "UNKNOWN":
        return False

    zones = border.split('_')
    if len(zones) != 2:
        return False

    zone1, zone2 = zones

    ptdf1 = abs(ptdf_dict.get(f'ptdf_{zone1}', 0.0))
    ptdf2 = abs(ptdf_dict.get(f'ptdf_{zone2}', 0.0))

    # At least one zone should have significant PTDF
    return (ptdf1 > threshold) or (ptdf2 > threshold)


def get_border_statistics(borders: list) -> Dict[str, int]:
    """
    Get frequency statistics for border assignments.

    Useful for validating that major FBMC borders are well-represented.

    Args:
        borders: List of border assignments

    Returns:
        Dictionary mapping border → count

    Example:
        >>> get_border_statistics(["DE_FR", "AT_SI", "DE_FR", "UNKNOWN"])
        {"DE_FR": 2, "AT_SI": 1, "UNKNOWN": 1}
    """
    stats = {}
    for border in borders:
        stats[border] = stats.get(border, 0) + 1

    # Sort by count (descending)
    return dict(sorted(stats.items(), key=lambda x: x[1], reverse=True))