fciannella's picture
Added the healthcare example
2f49513

Multi-Threaded Telco Agent - Implementation Summary

Overview

Successfully implemented Option 1: Tool-Level Designation for the telco agent, enabling non-blocking multi-threaded execution with intelligent routing based on whether operations are long-running or quick.

Files Modified/Created

1. Created: helper_functions.py

  • Location: agents/helper_functions.py
  • Purpose: Shared utilities for progress tracking and coordination
  • Functions:
    • write_status(): Write tool execution progress to store
    • reset_status(): Clear tool execution status

2. Modified: tools.py

  • Location: agents/telco-agent-multi/tools.py
  • Changes:
    • Added imports for progress tracking (time, get_store, get_stream_writer, etc.)
    • Updated 4 long-running tools with progress reporting:
      1. close_contract_tool (10 seconds, 5 steps)
      2. purchase_roaming_pass_tool (8 seconds, 4 steps)
      3. change_package_tool (10 seconds, 5 steps)
      4. get_billing_summary_tool (6 seconds, 3 steps)
    • Added check_status() tool for secondary thread
    • Marked all tools with is_long_running attribute (True/False)
    • Tools now send immediate feedback via get_stream_writer()

3. Modified: react_agent.py

  • Location: agents/telco-agent-multi/react_agent.py
  • Major Changes:

Imports

  • Added: time, ChatPromptTemplate, BaseStore, RunnableConfig, ensure_config, get_store
  • Imported helper functions

System Prompts

  • Added SECONDARY_SYSTEM_PROMPT for interim conversations
  • Kept original SYSTEM_PROMPT for main operations

LLM Configuration

  • Split tools into:
    • _MAIN_TOOLS: All 13 telco tools
    • _SECONDARY_TOOLS: 6 safe, quick tools + check_status
  • Created dual LLM bindings:
    • _LLM_WITH_TOOLS: Main thread (temp 0.3)
    • _HELPER_LLM_WITH_TOOLS: Secondary thread (temp 0.7)
  • Added _ALL_TOOLS_BY_NAME dictionary

Synthesis Chain

  • Added _SYNTHESIS_PROMPT and _SYNTHESIS_CHAIN
  • Merges tool results with interim conversation

Agent Function (Complete Refactor)

  • Signature Changed:

    # Before
    def agent(messages: List[BaseMessage], previous: List[BaseMessage] | None, config: Dict[str, Any] | None = None)
    
    # After
    def agent(input_dict: dict, previous: Any = None, config: RunnableConfig | None = None, store: BaseStore | None = None)
    
  • Input Dictionary:

    input_dict = {
        "messages": List[BaseMessage],
        "thread_type": "main" | "secondary",
        "interim_messages_reset": bool
    }
    
  • State Format:

    # Before: List[BaseMessage]
    # After: Dict[str, List[BaseMessage]]
    {
        "messages": [...],           # Full conversation
        "interim_messages": [...]    # Interim conversation during long ops
    }
    
  • New Features:

    1. Thread Type Routing: Choose LLM/tools based on thread type
    2. Processing Locks: Secondary thread sets lock at start
    3. Abort Handling: Main can signal secondary to abort
    4. Wait & Synthesize: Main waits for secondary (15s timeout) and synthesizes
    5. Progress Tracking: Reset status after main thread completion
    6. Store Coordination: Uses namespace for thread coordination

4. Created: MULTI_THREAD_README.md

  • Comprehensive documentation of:
    • Architecture overview
    • Long-running vs quick tools
    • Usage examples
    • Coordination mechanism
    • Safety features
    • Migration guide

5. Created: example_multi_thread.py

  • Executable demo script
  • Three scenarios:
    1. Long operation with status checks
    2. Quick synchronous query
    3. Interactive mode
  • Shows proper threading and routing logic

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                         User Request                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚                         β”‚
    Long Tool?               Quick Tool?
         β”‚                         β”‚
         β–Ό                         β–Ό
    Main Thread              Main Thread
         β”‚                    (synchronous)
         β”œβ”€ Execute Tool           β”‚
         β”‚  (with progress)        └─ Return
         β”‚                              
         β”œβ”€ Store progress               
         β”‚                              
         β”œβ”€ Check Secondary              
         β”‚                              
         └─ Synthesize Result            
                                         
    While Main Running:                  
         β–Ό                               
    Secondary Thread                     
         β”‚                               
         β”œβ”€ Handle interim queries       
         β”‚  (limited tools)              
         β”‚                               
         └─ Store interim messages       

Store Keys Used

Key Purpose Lifecycle
working-tool-status-update Tool progress (0-100%) Set by long tools, cleared by main
secondary_status Secondary thread lock Set at start, cleared at end
secondary_abort Abort signal Set by main on timeout, cleared by secondary
secondary_interim_messages Interim conversation Set by secondary, read/cleared by main

Tool Classification

Long-Running Tools (4 tools)

  1. close_contract_tool - 10 seconds

    • Simulates contract closure processing
    • Reports 5 progress steps
  2. purchase_roaming_pass_tool - 8 seconds

    • Simulates payment processing and activation
    • Reports 4 progress steps
  3. change_package_tool - 10 seconds

    • Simulates package provisioning
    • Reports 5 progress steps
  4. get_billing_summary_tool - 6 seconds

    • Simulates multi-system billing queries
    • Reports 3 progress steps

Quick Tools (9 tools)

  • start_login_tool, verify_login_tool (auth)
  • get_current_package_tool (lookup)
  • get_data_balance_tool (lookup)
  • list_available_packages_tool (catalog)
  • recommend_packages_tool (computation)
  • get_roaming_info_tool (reference data)
  • list_addons_tool (lookup)
  • set_data_alerts_tool (config update)

Helper Tool

  • check_status (progress query for secondary thread)

Safety Features

  1. Processing Locks

    • Secondary thread sets secondary_status.processing = True at start
    • Released when complete or aborted
    • Prevents race conditions
  2. Abort Signals

    • Main thread can set secondary_abort flag
    • Secondary checks flag at start and before writing results
    • Graceful termination without corrupting state
  3. Timeouts

    • Main thread waits max 15 seconds for secondary
    • Prevents indefinite blocking
    • Sets abort flag on timeout
  4. Message Sanitization

    • Removes orphan ToolMessage instances
    • Prevents OpenAI API 400 errors
    • Maintains conversation coherence
  5. State Isolation

    • Separate thread IDs for main and secondary
    • Namespace-based store isolation
    • No cross-contamination

Testing Recommendations

Unit Tests

  • Test long tools report progress correctly
  • Test check_status returns accurate status
  • Test message sanitization removes orphans
  • Test state merging (messages + interim_messages)

Integration Tests

  1. Single long operation: Verify completion and status reset
  2. Long operation + status check: Verify secondary can query progress
  3. Long operation + multiple queries: Verify multiple secondary calls
  4. Synthesis: Verify main synthesizes interim conversation
  5. Timeout: Verify main aborts secondary after 15s
  6. Quick operation: Verify no multi-threading overhead

Load Tests

  • Multiple concurrent users with different namespaces
  • Rapid main/secondary alternation
  • Store performance under load

Performance Considerations

  1. Store Access: Each coordination point hits the store

    • Consider caching for high-frequency access
    • Monitor store latency
  2. Synthesis LLM Call: Additional API call for merging

    • Only happens when interim conversation exists
    • Uses temperature 0.7 for natural language
  3. Thread Overhead: Secondary thread runs synchronously

    • No actual parallelism for safety
    • Consider async/await for true concurrency
  4. Timeout Waiting: Main thread sleeps in 0.5s intervals

    • 15 seconds max = 30 checks
    • Minimal CPU usage

Migration Path

For existing deployments:

  1. Update client code to use new input format
  2. Add namespace_for_memory to config
  3. Provide store instance to agent calls
  4. Update state handling to expect dict instead of list
  5. Test backward compatibility with quick tools (should work seamlessly)

Future Enhancements

  1. Dynamic Tool Marking: Tool duration could be learned/adjusted
  2. Priority Queue: Multiple long operations could queue
  3. True Async: Replace synchronous secondary with async/await
  4. Progress UI: Stream progress updates to frontend
  5. Cancellation: User-initiated cancellation of long operations
  6. Retry Logic: Automatic retry for failed long operations
  7. Telemetry: Track success rates, durations, timeout frequency

Credits

Implementation based on the multi-threaded agent pattern with:

  • Tool-level designation (Option 1)
  • Store-based coordination
  • Progress tracking and streaming
  • Conversation synthesis
  • Race condition handling

Date: September 30, 2025