File size: 16,332 Bytes
0646b18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
from dynaconf import Dynaconf
from pathlib import Path
from loguru import logger
from cuga.backend.activity_tracker.tracker import ActivityTracker
from cuga.backend.cuga_graph.utils.nodes_names import NodeNames
from cuga.configurations.set_from_one_file import parse_markdown_sections

root_dir = Path(__file__).parent.parent.absolute()

tracker = ActivityTracker()


class InstructionsManager:
    """Singleton class for managing instructions configuration"""

    _instance = None
    _in_memory_cache = {}

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(InstructionsManager, cls).__new__(cls)
        return cls._instance

    def __init__(self):
        # Only initialize once
        if not hasattr(self, '_initialized'):
            self._load_configuration()
            self._setup_key_mappings()
            self._log_initialization_summary()
            self._initialized = True

    def _setup_key_mappings(self):
        """Setup hard-coded key mappings for alternative access"""
        # Hard-coded mapping from alternative names to actual keys
        self._key_mappings = {
            # Example mappings - replace with your actual mappings
            NodeNames.API_CODE_PLANNER_AGENT: "api_code_planner",
            NodeNames.PLAN_CONTROLLER_AGENT: "plan_controller",
            NodeNames.DECOMPOSITION_AGENT: "task_decomposition",
            NodeNames.API_PLANNER_AGENT: "api_planner",
            NodeNames.FINAL_ANSWER_AGENT: "answer",
            NodeNames.SHORTLISTER_AGENT: "shortlister",
            NodeNames.CODE_AGENT: "code_agent",
        }

        # You can also create reverse mappings if needed
        self._reverse_mappings = {v: k for k, v in self._key_mappings.items()}

    def _resolve_key(self, key):
        """Resolve a key through the mapping system"""
        # First check if it's a direct key
        if key in self._instructions:
            return key

        # Then check if it's an alternative name
        if key in self._key_mappings:
            mapped_key = self._key_mappings[key]
            if mapped_key in self._instructions:
                return mapped_key
            else:
                logger.warning(f"Mapped key '{mapped_key}' for alias '{key}' not found in instructions")
                return None

        # Key not found in either direct keys or mappings
        return None

    def _load_configuration(self):
        """Load configuration once during initialization"""
        # Initialize dynaconf with TOML support (dotenv disabled)
        self._instructions_settings = Dynaconf(
            envvar_prefix="TOML_TEST",
            settings_files=["./configurations/instructions/instructions.toml"],
            environments=True,
            root_path=root_dir,
            load_dotenv=False,  # Disable dotenv loading
            core_loaders=["TOML"],
        )

        # Load TOML data directly
        self._instructions = self._load_toml_config()

    def _load_toml_config(self):
        """Load TOML configuration using Dynaconf"""
        try:
            config_path = root_dir / Path("./configurations/instructions")

            settings = Dynaconf(
                settings_files=[str(config_path / "instructions.toml")],
                load_dotenv=False,  # Optional: also load .env files
                envvar_prefix="INSTRUCTIONS",  # Optional: prefix for env vars
            )

            return settings

        except Exception as e:
            logger.error(f"Error loading TOML configuration: {e}")
            return Dynaconf()  # Return empty Dynaconf instance

    def _log_initialization_summary(self):
        """Log a nice summary of what configuration was loaded"""
        try:
            config_path = root_dir / Path("./configurations/instructions/instructions.toml")

            # Basic file info
            if config_path.exists():
                file_size = config_path.stat().st_size
                logger.success("πŸ“‹ Instructions configuration loaded successfully")
                logger.info(f"   πŸ“ Config file: {config_path.relative_to(root_dir)}")
                logger.info(f"   πŸ“ File size: {file_size:,} bytes")
            else:
                logger.warning(f"   ⚠️  Config file not found: {config_path}")
                return

            # Count instruction keys
            instruction_keys = self.get_all_instruction_keys()
            logger.info(f"   πŸ”‘ Instruction sections: {len(instruction_keys)}")

            # Log key mappings info
            if self._key_mappings:
                logger.info(f"   πŸ”— Key mappings available: {len(self._key_mappings)}")
                if len(self._key_mappings) <= 5:
                    mapping_str = ", ".join([f"{k}β†’{v}" for k, v in self._key_mappings.items()])
                    logger.info(f"   🏷️  Mappings: {mapping_str}")
                else:
                    sample_mappings = dict(list(self._key_mappings.items())[:3])
                    mapping_str = ", ".join([f"{k}β†’{v}" for k, v in sample_mappings.items()])
                    logger.info(
                        f"   🏷️  Sample mappings: {mapping_str}... (+{len(self._key_mappings) - 3} more)"
                    )

            if instruction_keys:
                # Count file-based vs inline instructions
                file_based_count = 0
                inline_count = 0
                total_chars = 0

                for key in instruction_keys:
                    raw_value = self._instructions.get(key, {}).get('instructions', "")
                    if raw_value.startswith("./"):
                        file_based_count += 1
                        # Try to get actual content length
                        content = self._load_file_content(raw_value)
                        total_chars += len(content)
                    else:
                        inline_count += 1
                        total_chars += len(raw_value)

                logger.info(f"   πŸ“ Inline instructions: {inline_count}")
                logger.info(f"   πŸ“„ File-based instructions: {file_based_count}")
                logger.info(f"   πŸ“Š Total content length: {total_chars:,} characters")

                # Show available instruction keys (limited to avoid spam)
                if len(instruction_keys) <= 10:
                    keys_str = ", ".join(instruction_keys)
                    logger.info(f"   🏷️  Available keys: {keys_str}")
                else:
                    sample_keys = instruction_keys[:8]
                    keys_str = ", ".join(sample_keys)
                    logger.info(f"   🏷️  Available keys: {keys_str}... (+{len(instruction_keys) - 8} more)")
            else:
                logger.warning("   ⚠️  No instruction sections found in configuration")

        except Exception as e:
            logger.error(f"Error generating initialization summary: {e}")

    def _load_file_content(self, file_path):
        """Load file content if it exists, return empty string otherwise"""
        # Handle relative paths starting with ./
        if file_path.startswith("./"):
            file_path = file_path[2:]

        # Get the absolute path of the current config.py file
        config_dir = Path(__file__).parent.parent.absolute()
        full_path = config_dir / file_path

        # Validate that the path is within the config directory for security
        try:
            full_path = full_path.resolve()
            if not str(full_path).startswith(str(config_dir)):
                logger.warning(f"Security warning: Path {file_path} is outside config directory")
                return ""
        except Exception as e:
            logger.error(f"Error resolving path {file_path}: {e}")
            return ""

        if full_path.exists():
            try:
                with open(full_path, 'r', encoding='utf-8') as f:
                    return f.read().strip()
            except Exception as e:
                logger.error(f"Error reading file {full_path}: {e}")
                return ""
        else:
            logger.warning(f"File not found: {full_path}")
            return ""

    def get_instructions(self, key):
        """
        Generic function to get instructions for any key.
        Checks in-memory cache first, then configuration.
        """
        resolved_key = self._resolve_key(key)

        if resolved_key is None:
            logger.warning(f"Key '{key}' not found in instructions or key mappings")
            return ""

        # Check cache with both original and uppercase key
        cache_key = resolved_key.upper() if resolved_key else None
        if cache_key and cache_key in self._in_memory_cache:
            logger.info(f"Loaded '{cache_key}' from in-memory cache.")
            return self._in_memory_cache[cache_key]
        elif resolved_key in self._in_memory_cache:
            logger.info(f"Loaded '{resolved_key}' from in-memory cache.")
            return self._in_memory_cache[resolved_key]

        try:
            # Log if we used a mapping
            if resolved_key != key:
                logger.debug(f"Using key mapping: '{key}' β†’ '{resolved_key}'")

            value = self._instructions.get(resolved_key, {}).get('instructions', "")
            if value.startswith("./"):
                content = self._load_file_content(value)
            else:
                content = value

            # Store in cache with uppercase key for consistency
            if cache_key:
                self._in_memory_cache[cache_key] = content
            return content
        except Exception as e:
            logger.error(f"Error getting instructions for key '{key}': {e}")
            return ""

    def set_instructions_from_one_file(self, instructions: str | None = None):
        if not instructions:
            self._in_memory_cache.clear()
            return

        res = parse_markdown_sections(instructions)
        if res.personal_information:
            tracker.pi = res.personal_information
        if res.answer:
            resolved_key = self._resolve_key('answer')
            # Normalize to uppercase to match get_all_instruction_keys() output
            if resolved_key:
                self._in_memory_cache[resolved_key.upper()] = res.answer
        if res.plan:
            resolved_key = self._resolve_key('api_planner')
            # Normalize to uppercase to match get_all_instruction_keys() output
            if resolved_key:
                self._in_memory_cache[resolved_key.upper()] = res.plan

    def set_instruction(self, key_name: str, value: str):
        """
        Sets or updates an instruction in the in-memory cache.
        This will override any instruction loaded from configuration.
        """
        resolved_key = self._resolve_key(key_name)
        if resolved_key is None:
            # If key doesn't exist, we can't set it unless we add it to instructions.
            # For this implementation, we will add it to the cache directly.
            resolved_key = key_name
            logger.warning(f"Key '{key_name}' not found in configuration. Adding to in-memory cache only.")

        # Use uppercase key for consistency
        cache_key = resolved_key.upper() if resolved_key else resolved_key
        self._in_memory_cache[cache_key] = value
        logger.info(f"Set instruction for key '{cache_key}' in memory.")

    def get_all_instruction_keys(self):
        """Get all keys that have 'instructions' as a child"""
        instruction_keys = []
        for key, value in self._instructions.items():
            if isinstance(value, dict) and 'instructions' in value:
                instruction_keys.append(key)
        return instruction_keys

    def get_all_available_keys(self):
        """Get all available keys including both direct keys and mapped aliases"""
        direct_keys = self.get_all_instruction_keys()
        mapped_keys = list(self._key_mappings.keys())
        return {'direct_keys': direct_keys, 'mapped_keys': mapped_keys, 'all_keys': direct_keys + mapped_keys}

    def get_all_instructions_formatted(self):
        """Get all instructions formatted as markdown with key-value pairs"""
        instruction_keys = self.get_all_instruction_keys()
        if not instruction_keys:
            logger.warning("No instruction keys found")
            return None

        markdown_sections = []
        for key in sorted(instruction_keys):
            instructions = self.get_instructions(key)
            if instructions.strip():
                # Format as nested bullet points under the key
                formatted_key = key.replace('_', ' ').title()
                # Format instructions content as nested bullets if multi-line
                instruction_lines = instructions.strip().split('\n')
                if len(instruction_lines) > 1:
                    # Multi-line: format each line as nested bullet
                    nested_content = '\n'.join(
                        f"  - {line.strip()}" for line in instruction_lines if line.strip()
                    )
                    section = f"- **{formatted_key}**\n{nested_content}"
                else:
                    # Single line: simple format
                    section = f"- **{formatted_key}**\n  - {instructions.strip()}"
                markdown_sections.append(section)

        # Return None if no sections were added (all values were empty)
        if not markdown_sections:
            logger.warning("No markdown sections found")
            return None
        logger.info(f"All instructions formatted: {markdown_sections}")
        return "\n\n".join(markdown_sections)

    def get_key_mappings(self):
        """Get the current key mappings dictionary"""
        return self._key_mappings.copy()

    def add_key_mapping(self, alias, actual_key):
        """Add a new key mapping at runtime"""
        if actual_key not in self._instructions:
            logger.warning(f"Target key '{actual_key}' does not exist in instructions")
            return False

        self._key_mappings[alias] = actual_key
        logger.info(f"Added key mapping: '{alias}' β†’ '{actual_key}'")
        return True

    def remove_key_mapping(self, alias):
        """Remove a key mapping"""
        if alias in self._key_mappings:
            removed_key = self._key_mappings.pop(alias)
            logger.info(f"Removed key mapping: '{alias}' β†’ '{removed_key}'")
            return True
        else:
            logger.warning(f"Key mapping '{alias}' not found")
            return False

    def reload_configuration(self):
        """Manually reload configuration if needed"""
        logger.info("πŸ”„ Reloading instructions configuration...")
        self._load_configuration()
        self._setup_key_mappings()  # Reload mappings as well
        self._log_initialization_summary()
        logger.success("βœ… Configuration reloaded successfully")

    @property
    def instructions_settings(self):
        """Access to the dynaconf settings object"""
        return self._instructions_settings

    @property
    def raw_instructions(self):
        """Access to the raw instructions dictionary"""
        return self._instructions

    @property
    def key_mappings(self):
        """Access to the key mappings dictionary"""
        return self._key_mappings.copy()


# Convenience functions for backward compatibility
def get_instructions_manager():
    """Get the singleton instance"""
    return InstructionsManager()


def get_instructions(key):
    """Get instructions for a key using the singleton"""
    return get_instructions_manager().get_instructions(key)


def get_all_instruction_keys():
    """Get all instruction keys using the singleton"""
    return get_instructions_manager().get_all_instruction_keys()


def get_all_available_keys():
    """Get all available keys including mapped aliases"""
    return get_instructions_manager().get_all_available_keys()


def add_key_mapping(alias, actual_key):
    """Add a new key mapping"""
    return get_instructions_manager().add_key_mapping(alias, actual_key)


def get_all_instructions_formatted():
    """Get all instructions formatted as markdown"""
    return get_instructions_manager().get_all_instructions_formatted()