# behavior_model.py -- REPLACED with "Neural Structure / MoE-style Dispatcher" """ Large, modular 'neural structure' dispatcher (software MoE) for intent/complexity routing. How to use: - Replace your existing behavior_model.py with this file. - app.py expects analyze_flow(messages) -> dict with keys: { route: "direct"|"planning", is_complex: bool, flow_label: str, confidence: float, explanation: str, experts: [...] } Design: - Feature extractor -> gating network (scoring) -> top-K expert selection -> combine/explain decision - Experts are modular callables; by default they are heuristic "experts". - To scale: implement Expert.run(...) to call real submodels/endpoints (local small models, remote microservices). """ from typing import List, Dict, Any, Callable, Tuple import re import math import json import os import statistics # ---------- Configurable constants ---------- TOP_K = int(os.environ.get("NS_TOP_K", "2")) # how many experts to activate per request SOFTMAX_TEMPERATURE = float(os.environ.get("NS_TEMP", "1.0")) MIN_COMPLEX_CONF_FOR_PLANNING = float(os.environ.get("NS_MIN_COMPLEX_CONF", "0.56")) MAX_EXPERTS = int(os.environ.get("NS_MAX_EXPERTS", "12")) # Weights (tunables) WEIGHT_LENGTH = float(os.environ.get("NS_W_LENGTH", "1.0")) WEIGHT_KEYWORD = float(os.environ.get("NS_W_KEYWORD", "1.9")) WEIGHT_CODE = float(os.environ.get("NS_W_CODE", "2.4")) WEIGHT_NUMERIC = float(os.environ.get("NS_W_NUMERIC", "1.2")) WEIGHT_QUESTION = float(os.environ.get("NS_W_QUESTION", "0.6")) WEIGHT_URGENT = float(os.environ.get("NS_W_URGENT", "2.2")) WEIGHT_HISTORY = float(os.environ.get("NS_W_HISTORY", "0.8")) # ---------- Regex / keyword lists ---------- _code_fence_re = re.compile(r"```.+?```", flags=re.DOTALL | re.IGNORECASE) _inline_code_re = re.compile(r"`[^`]+`") _number_re = re.compile(r"\b\d+(\.\d+)?\b") _list_marker_re = re.compile(r"(^\s*[-*•]\s+)|(^\s*\d+\.\s+)", flags=re.MULTILINE) _url_re = re.compile(r"https?://\S+") _question_word_re = re.compile(r"^\s*(who|what|why|how|when|which|where)\b", flags=re.IGNORECASE) _question_mark_re = re.compile(r"\?$") _task_keywords = set(k.lower() for k in [ "build", "create", "implement", "develop", "deploy", "install", "setup", "configure", "optimi", "debug", "fix", "error", "crash", "stacktrace", "exception", "traceback", "code", "script", "function", "api", "endpoint", "database", "sql", "mongodb", "mysql", "docker", "deno", "node", "express", "php", "python", "java", "rust", "golang", "compile", "performance", "latency", "bandwidth", "optimization", "optimize", "algorithm", "complexity", "big o", "time complexity", "space complexity", "report", "plan", "design", "architecture", "integration", "migrate", "refactor", "test case", "unit test", "e2e test", "prove", "derive", "integral", "differentiate", "matrix", "neural network", "train", "model", ]) _urgent_words = set(w.lower() for w in ["urgent", "asap", "immediately", "now", "critical", "important", "priority", "must"]) _short_chat_terms = set(w.lower() for w in ["hi", "hello", "thanks", "thank you", "bye", "ok", "okay", "nice", "cool", "🙂", "😊"]) # ---------- Utility functions ---------- def _word_count(text: str) -> int: return len(re.findall(r"\w+", text)) if text else 0 def _has_code(text: str) -> bool: if not text: return False return bool(_code_fence_re.search(text) or _inline_code_re.search(text) or re.search(r"\bdef\s+\w+\(|;\s*$", text, flags=re.IGNORECASE)) def _has_list(text: str) -> bool: return bool(_list_marker_re.search(text)) def _keyword_matches(text: str) -> int: if not text: return 0 t = text.lower() cnt = 0 for kw in _task_keywords: if kw in t: cnt += 1 return cnt def _numeric_count(text: str) -> int: return len(_number_re.findall(text or "")) def _is_urgent(text: str) -> bool: t = (text or "").lower() return any(w in t for w in _urgent_words) def _short_chat_score(text: str) -> bool: t = (text or "").strip().lower() if len(t.split()) <= 2 and any(tok in t for tok in _short_chat_terms): return True return False def _question_score(text: str) -> float: s = 0.0 if _question_mark_re.search(text or ""): s += 1.0 if _question_word_re.match((text or "").strip()): s += 0.6 return s def _history_signal(messages: List[Dict[str,str]]) -> float: # simple heuristic: if previous user messages contained technical keywords recently, boost if not messages or len(messages) < 2: return 0.0 prev = " ".join(m.get("content","") for m in messages[-4:-1] if isinstance(m, dict)) return float(min(3, _keyword_matches(prev))) * 0.2 # ---------- Softmax helper ---------- def _softmax(scores: List[float], temp: float = 1.0) -> List[float]: if not scores: return [] exps = [math.exp(s / temp) for s in scores] s = sum(exps) if s == 0: return [1.0/len(scores)]*len(scores) return [e/s for e in exps] # ---------- Expert base classes ---------- class Expert: name: str description: str def __init__(self, name:str, description:str): self.name = name self.description = description def score(self, features: Dict[str,Any]) -> float: """Return a heuristic affinity score (higher = more relevant).""" # default neutral return 0.0 def run(self, messages: List[Dict[str,str]], features: Dict[str,Any]) -> Dict[str,Any]: """ Optionally run expert-specific logic (synchronously). For now return metadata only. In production this could call a model endpoint. """ return {"expert": self.name, "action": "noop", "note": "heuristic-only"} # ---------- Concrete experts ---------- class ShortChatExpert(Expert): def __init__(self): super().__init__("short_chat", "Handles greetings/short conversational turns") def score(self, f): if f.get("short_chat"): return 5.0 return 0.1 def run(self, messages, features): return {"expert": self.name, "action": "short_reply", "note": "Use concise response template."} class CodeExpert(Expert): def __init__(self): super().__init__("code_expert", "Handles code, stacktraces, debugging tasks") def score(self, f): sc = 0.0 if f.get("has_code"): sc += 4.0 sc += 0.8 * f.get("kw_count",0) sc += 0.6 * f.get("numeric_count",0) return sc def run(self, messages, features): # Placeholder: in production call a code-specialized model or analyzer endpoint return {"expert": self.name, "action": "analyze_code", "note": "Run code LLM or static-checker (not implemented)."} class NLUExpert(Expert): def __init__(self): super().__init__("nlu_expert", "Deep intent and slot extraction / classification") def score(self, f): sc = 1.0 * f.get("kw_count",0) sc += 0.8 * f.get("question_score",0) sc += 0.4 * (f.get("word_count",0) / 30.0) sc += 0.6 * f.get("history_signal",0) return sc def run(self, messages, features): # Example: return intent classification tags (heuristic) intent = "general" if features.get("kw_count",0) >= 2 or features.get("has_code"): intent = "technical_task" elif features.get("short_chat"): intent = "social" return {"expert": self.name, "action": "classify_intent", "intent": intent} class RAGExpert(Expert): def __init__(self): super().__init__("rag_expert", "Handles retrieval-augmented requests (RAG/agent)") def score(self, f): sc = 0.0 # if user mentions 'search', 'latest', has urls, or long context -> RAG useful if f.get("has_url"): sc += 2.0 sc += 1.2 * f.get("kw_count",0) sc += 0.9 * f.get("numeric_count",0) if f.get("word_count",0) > 60: sc += 1.5 return sc def run(self, messages, features): # Placeholder: should trigger a retrieval job or agent return {"expert": self.name, "action": "retrieve", "note": "Trigger RAG pipeline or agent (not implemented)."} class SafetyExpert(Expert): def __init__(self): super().__init__("safety_expert", "Safety checks, identity questions, hallucination guards") def score(self, f): sc = 0.0 txt = f.get("last_text","").lower() if f.get("last_text") else "" if any(w in txt for w in ["who created you","who made you","identity","where are you from"]): sc += 3.0 # any suspicious tokens (email, ssn, credit card-like) -> safety if re.search(r"\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b", txt): sc += 4.0 return sc def run(self, messages, features): return {"expert": self.name, "action": "safety_check", "note": "Run policy checks."} # Add more experts as needed... _DEFAULT_EXPERTS: List[Expert] = [ ShortChatExpert(), NLUExpert(), CodeExpert(), RAGExpert(), SafetyExpert(), ] # ---------- Core gating/routing function ---------- def _extract_features(messages: List[Dict[str,str]]) -> Dict[str,Any]: if not messages: return {"word_count": 0, "kw_count":0, "has_code": False, "numeric_count":0, "question_score":0.0, "short_chat": False, "has_url": False, "history_signal":0.0, "last_text":""} last = messages[-1].get("content","") if isinstance(messages[-1], dict) else str(messages[-1]) prev = " ".join(m.get("content","") for m in messages[:-1] if isinstance(m, dict)) full = (prev + "\n" + last).strip() features = {} features["last_text"] = last features["word_count"] = _word_count(last) features["total_word_count"] = _word_count(full) features["kw_count"] = _keyword_matches(full) features["has_code"] = _has_code(full) features["has_list"] = _has_list(full) features["numeric_count"] = _numeric_count(full) features["question_score"] = _question_score(last) features["short_chat"] = _short_chat_score(last) features["has_url"] = bool(_url_re.search(full)) features["is_urgent"] = _is_urgent(full) features["history_signal"] = _history_signal(messages) return features def _gate_select_experts(features: Dict[str,Any], experts: List[Expert]) -> Tuple[List[Tuple[Expert,float]], List[float]]: # compute raw scores per expert raw_scores = [max(0.0, e.score(features)) for e in experts] if not raw_scores: return [], [] # normalize via softmax for relative weighting probs = _softmax(raw_scores, temp=SOFTMAX_TEMPERATURE) # select top-K experts by probability indexed = list(enumerate(probs)) indexed.sort(key=lambda x: x[1], reverse=True) top = indexed[:TOP_K] chosen = [(experts[i], probs[i]) for i, _ in top] return chosen, probs # ---------- Public API: analyze_flow ---------- def analyze_flow(messages: List[Dict[str,str]]) -> Dict[str,Any]: """ Returns: { "route": "direct" / "planning", "is_complex": bool, "flow_label": str, "confidence": float, "explanation": str, "experts": [ {"name":.., "score":.., "note":..}, ... ] } """ features = _extract_features(messages) experts = _DEFAULT_EXPERTS.copy() # gating chosen, probs = _gate_select_experts(features, experts) # Decide flow_label heuristics based on features flow_label = "general" if features.get("has_code") or features.get("kw_count",0) >= 2: flow_label = "coding_request" elif features.get("is_urgent"): flow_label = "escalation" elif features.get("kw_count",0) >= 1 and features.get("word_count",0) >= 25: flow_label = "task_request" elif features.get("short_chat"): flow_label = "short_chat" elif features.get("question_score",0) > 0.9 and features.get("word_count",0) < 25: flow_label = "short_question" # compute a complexity/confidence scalar from features + expert probs feature_score = ( WEIGHT_LENGTH * (features.get("word_count",0) / 30.0) + WEIGHT_KEYWORD * features.get("kw_count",0) + WEIGHT_CODE * (4.0 if features.get("has_code") else 0.0) + WEIGHT_NUMERIC * features.get("numeric_count",0) + WEIGHT_QUESTION * features.get("question_score",0) + WEIGHT_URGENT * (1.0 if features.get("is_urgent") else 0.0) + WEIGHT_HISTORY * features.get("history_signal",0) ) # Map to 0..1 via logistic conf = 1.0 / (1.0 + math.exp(-0.45 * (feature_score - 2.0))) conf = max(0.0, min(1.0, conf)) # route decision is_complex = conf >= MIN_COMPLEX_CONF_FOR_PLANNING or features.get("has_code") or features.get("kw_count",0) >= 2 # short-chat override: always direct if features.get("short_chat"): route = "direct" is_complex = False else: route = "planning" if is_complex else "direct" # Build explanation and expert list expert_list = [] for e, p in chosen: # we can call run() here for metadata without actually executing heavy ops meta = e.run(messages, features) expert_list.append({"name": e.name, "prob": round(float(p),4), "meta": meta}) explanation = ("features=" + json.dumps(features) + f" | feature_score={feature_score:.2f} | conf={conf:.3f} | chosen={[e.name for e,_ in chosen]}") return { "route": route, "is_complex": bool(is_complex), "flow_label": flow_label, "confidence": round(float(conf), 3), "explanation": explanation, "experts": expert_list } # ---------- Debug helper ---------- def debug_flow(text: str, history: List[str] = None): hist_msgs = [{"role":"user","content":h} for h in (history or [])] hist_msgs.append({"role":"user","content": text}) return analyze_flow(hist_msgs) # Example self-test when run directly if __name__ == "__main__": tests = [ "Hi 🙂", "What is your name?", "What is neural network", "My app crashes with TypeError: undefined is not a function. Stacktrace: ```TypeError: ...``` How to fix?", "Deploy my node app to Docker with Nginx and SSL — step-by-step please.", "Quick: 2+2?" ] for t in tests: print("----") print("MSG:", t) out = debug_flow(t) print(json.dumps(out, indent=2))