Spaces:
Running
Multi-Threaded Telco Agent - Implementation Summary
Overview
Successfully implemented Option 1: Tool-Level Designation for the telco agent, enabling non-blocking multi-threaded execution with intelligent routing based on whether operations are long-running or quick.
Files Modified/Created
1. Created: helper_functions.py
- Location:
agents/helper_functions.py - Purpose: Shared utilities for progress tracking and coordination
- Functions:
write_status(): Write tool execution progress to storereset_status(): Clear tool execution status
2. Modified: tools.py
- Location:
agents/telco-agent-multi/tools.py - Changes:
- Added imports for progress tracking (
time,get_store,get_stream_writer, etc.) - Updated 4 long-running tools with progress reporting:
close_contract_tool(10 seconds, 5 steps)purchase_roaming_pass_tool(8 seconds, 4 steps)change_package_tool(10 seconds, 5 steps)get_billing_summary_tool(6 seconds, 3 steps)
- Added
check_status()tool for secondary thread - Marked all tools with
is_long_runningattribute (True/False) - Tools now send immediate feedback via
get_stream_writer()
- Added imports for progress tracking (
3. Modified: react_agent.py
- Location:
agents/telco-agent-multi/react_agent.py - Major Changes:
Imports
- Added:
time,ChatPromptTemplate,BaseStore,RunnableConfig,ensure_config,get_store - Imported helper functions
System Prompts
- Added
SECONDARY_SYSTEM_PROMPTfor interim conversations - Kept original
SYSTEM_PROMPTfor main operations
LLM Configuration
- Split tools into:
_MAIN_TOOLS: All 13 telco tools_SECONDARY_TOOLS: 6 safe, quick tools +check_status
- Created dual LLM bindings:
_LLM_WITH_TOOLS: Main thread (temp 0.3)_HELPER_LLM_WITH_TOOLS: Secondary thread (temp 0.7)
- Added
_ALL_TOOLS_BY_NAMEdictionary
Synthesis Chain
- Added
_SYNTHESIS_PROMPTand_SYNTHESIS_CHAIN - Merges tool results with interim conversation
Agent Function (Complete Refactor)
Signature Changed:
# Before def agent(messages: List[BaseMessage], previous: List[BaseMessage] | None, config: Dict[str, Any] | None = None) # After def agent(input_dict: dict, previous: Any = None, config: RunnableConfig | None = None, store: BaseStore | None = None)Input Dictionary:
input_dict = { "messages": List[BaseMessage], "thread_type": "main" | "secondary", "interim_messages_reset": bool }State Format:
# Before: List[BaseMessage] # After: Dict[str, List[BaseMessage]] { "messages": [...], # Full conversation "interim_messages": [...] # Interim conversation during long ops }New Features:
- Thread Type Routing: Choose LLM/tools based on thread type
- Processing Locks: Secondary thread sets lock at start
- Abort Handling: Main can signal secondary to abort
- Wait & Synthesize: Main waits for secondary (15s timeout) and synthesizes
- Progress Tracking: Reset status after main thread completion
- Store Coordination: Uses namespace for thread coordination
4. Created: MULTI_THREAD_README.md
- Comprehensive documentation of:
- Architecture overview
- Long-running vs quick tools
- Usage examples
- Coordination mechanism
- Safety features
- Migration guide
5. Created: example_multi_thread.py
- Executable demo script
- Three scenarios:
- Long operation with status checks
- Quick synchronous query
- Interactive mode
- Shows proper threading and routing logic
Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β User Request β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββ΄βββββββββββββ
β β
Long Tool? Quick Tool?
β β
βΌ βΌ
Main Thread Main Thread
β (synchronous)
ββ Execute Tool β
β (with progress) ββ Return
β
ββ Store progress
β
ββ Check Secondary
β
ββ Synthesize Result
While Main Running:
βΌ
Secondary Thread
β
ββ Handle interim queries
β (limited tools)
β
ββ Store interim messages
Store Keys Used
| Key | Purpose | Lifecycle |
|---|---|---|
working-tool-status-update |
Tool progress (0-100%) | Set by long tools, cleared by main |
secondary_status |
Secondary thread lock | Set at start, cleared at end |
secondary_abort |
Abort signal | Set by main on timeout, cleared by secondary |
secondary_interim_messages |
Interim conversation | Set by secondary, read/cleared by main |
Tool Classification
Long-Running Tools (4 tools)
close_contract_tool- 10 seconds- Simulates contract closure processing
- Reports 5 progress steps
purchase_roaming_pass_tool- 8 seconds- Simulates payment processing and activation
- Reports 4 progress steps
change_package_tool- 10 seconds- Simulates package provisioning
- Reports 5 progress steps
get_billing_summary_tool- 6 seconds- Simulates multi-system billing queries
- Reports 3 progress steps
Quick Tools (9 tools)
start_login_tool,verify_login_tool(auth)get_current_package_tool(lookup)get_data_balance_tool(lookup)list_available_packages_tool(catalog)recommend_packages_tool(computation)get_roaming_info_tool(reference data)list_addons_tool(lookup)set_data_alerts_tool(config update)
Helper Tool
check_status(progress query for secondary thread)
Safety Features
Processing Locks
- Secondary thread sets
secondary_status.processing = Trueat start - Released when complete or aborted
- Prevents race conditions
- Secondary thread sets
Abort Signals
- Main thread can set
secondary_abortflag - Secondary checks flag at start and before writing results
- Graceful termination without corrupting state
- Main thread can set
Timeouts
- Main thread waits max 15 seconds for secondary
- Prevents indefinite blocking
- Sets abort flag on timeout
Message Sanitization
- Removes orphan
ToolMessageinstances - Prevents OpenAI API 400 errors
- Maintains conversation coherence
- Removes orphan
State Isolation
- Separate thread IDs for main and secondary
- Namespace-based store isolation
- No cross-contamination
Testing Recommendations
Unit Tests
- Test long tools report progress correctly
- Test
check_statusreturns accurate status - Test message sanitization removes orphans
- Test state merging (messages + interim_messages)
Integration Tests
- Single long operation: Verify completion and status reset
- Long operation + status check: Verify secondary can query progress
- Long operation + multiple queries: Verify multiple secondary calls
- Synthesis: Verify main synthesizes interim conversation
- Timeout: Verify main aborts secondary after 15s
- Quick operation: Verify no multi-threading overhead
Load Tests
- Multiple concurrent users with different namespaces
- Rapid main/secondary alternation
- Store performance under load
Performance Considerations
Store Access: Each coordination point hits the store
- Consider caching for high-frequency access
- Monitor store latency
Synthesis LLM Call: Additional API call for merging
- Only happens when interim conversation exists
- Uses temperature 0.7 for natural language
Thread Overhead: Secondary thread runs synchronously
- No actual parallelism for safety
- Consider async/await for true concurrency
Timeout Waiting: Main thread sleeps in 0.5s intervals
- 15 seconds max = 30 checks
- Minimal CPU usage
Migration Path
For existing deployments:
- Update client code to use new input format
- Add
namespace_for_memoryto config - Provide store instance to agent calls
- Update state handling to expect dict instead of list
- Test backward compatibility with quick tools (should work seamlessly)
Future Enhancements
- Dynamic Tool Marking: Tool duration could be learned/adjusted
- Priority Queue: Multiple long operations could queue
- True Async: Replace synchronous secondary with async/await
- Progress UI: Stream progress updates to frontend
- Cancellation: User-initiated cancellation of long operations
- Retry Logic: Automatic retry for failed long operations
- Telemetry: Track success rates, durations, timeout frequency
Credits
Implementation based on the multi-threaded agent pattern with:
- Tool-level designation (Option 1)
- Store-based coordination
- Progress tracking and streaming
- Conversation synthesis
- Race condition handling
Date: September 30, 2025