Spaces:
Sleeping
Sleeping
| # behavior_model.py -- REPLACED with "Neural Structure / MoE-style Dispatcher" | |
| """ | |
| Large, modular 'neural structure' dispatcher (software MoE) for intent/complexity routing. | |
| How to use: | |
| - Replace your existing behavior_model.py with this file. | |
| - app.py expects analyze_flow(messages) -> dict with keys: | |
| { route: "direct"|"planning", is_complex: bool, flow_label: str, confidence: float, explanation: str, experts: [...] } | |
| Design: | |
| - Feature extractor -> gating network (scoring) -> top-K expert selection -> combine/explain decision | |
| - Experts are modular callables; by default they are heuristic "experts". | |
| - To scale: implement Expert.run(...) to call real submodels/endpoints (local small models, remote microservices). | |
| """ | |
| from typing import List, Dict, Any, Callable, Tuple | |
| import re | |
| import math | |
| import json | |
| import os | |
| import statistics | |
| # ---------- Configurable constants ---------- | |
| TOP_K = int(os.environ.get("NS_TOP_K", "2")) # how many experts to activate per request | |
| SOFTMAX_TEMPERATURE = float(os.environ.get("NS_TEMP", "1.0")) | |
| MIN_COMPLEX_CONF_FOR_PLANNING = float(os.environ.get("NS_MIN_COMPLEX_CONF", "0.56")) | |
| MAX_EXPERTS = int(os.environ.get("NS_MAX_EXPERTS", "12")) | |
| # Weights (tunables) | |
| WEIGHT_LENGTH = float(os.environ.get("NS_W_LENGTH", "1.0")) | |
| WEIGHT_KEYWORD = float(os.environ.get("NS_W_KEYWORD", "1.9")) | |
| WEIGHT_CODE = float(os.environ.get("NS_W_CODE", "2.4")) | |
| WEIGHT_NUMERIC = float(os.environ.get("NS_W_NUMERIC", "1.2")) | |
| WEIGHT_QUESTION = float(os.environ.get("NS_W_QUESTION", "0.6")) | |
| WEIGHT_URGENT = float(os.environ.get("NS_W_URGENT", "2.2")) | |
| WEIGHT_HISTORY = float(os.environ.get("NS_W_HISTORY", "0.8")) | |
| # ---------- Regex / keyword lists ---------- | |
| _code_fence_re = re.compile(r"```.+?```", flags=re.DOTALL | re.IGNORECASE) | |
| _inline_code_re = re.compile(r"`[^`]+`") | |
| _number_re = re.compile(r"\b\d+(\.\d+)?\b") | |
| _list_marker_re = re.compile(r"(^\s*[-*•]\s+)|(^\s*\d+\.\s+)", flags=re.MULTILINE) | |
| _url_re = re.compile(r"https?://\S+") | |
| _question_word_re = re.compile(r"^\s*(who|what|why|how|when|which|where)\b", flags=re.IGNORECASE) | |
| _question_mark_re = re.compile(r"\?$") | |
| _task_keywords = set(k.lower() for k in [ | |
| "build", "create", "implement", "develop", "deploy", "install", "setup", "configure", | |
| "optimi", "debug", "fix", "error", "crash", "stacktrace", "exception", "traceback", | |
| "code", "script", "function", "api", "endpoint", "database", "sql", "mongodb", "mysql", | |
| "docker", "deno", "node", "express", "php", "python", "java", "rust", "golang", "compile", | |
| "performance", "latency", "bandwidth", "optimization", "optimize", | |
| "algorithm", "complexity", "big o", "time complexity", "space complexity", | |
| "report", "plan", "design", "architecture", "integration", "migrate", "refactor", | |
| "test case", "unit test", "e2e test", | |
| "prove", "derive", "integral", "differentiate", "matrix", "neural network", "train", "model", | |
| ]) | |
| _urgent_words = set(w.lower() for w in ["urgent", "asap", "immediately", "now", "critical", "important", "priority", "must"]) | |
| _short_chat_terms = set(w.lower() for w in ["hi", "hello", "thanks", "thank you", "bye", "ok", "okay", "nice", "cool", "🙂", "😊"]) | |
| # ---------- Utility functions ---------- | |
| def _word_count(text: str) -> int: | |
| return len(re.findall(r"\w+", text)) if text else 0 | |
| def _has_code(text: str) -> bool: | |
| if not text: return False | |
| return bool(_code_fence_re.search(text) or _inline_code_re.search(text) or re.search(r"\bdef\s+\w+\(|;\s*$", text, flags=re.IGNORECASE)) | |
| def _has_list(text: str) -> bool: | |
| return bool(_list_marker_re.search(text)) | |
| def _keyword_matches(text: str) -> int: | |
| if not text: return 0 | |
| t = text.lower() | |
| cnt = 0 | |
| for kw in _task_keywords: | |
| if kw in t: | |
| cnt += 1 | |
| return cnt | |
| def _numeric_count(text: str) -> int: | |
| return len(_number_re.findall(text or "")) | |
| def _is_urgent(text: str) -> bool: | |
| t = (text or "").lower() | |
| return any(w in t for w in _urgent_words) | |
| def _short_chat_score(text: str) -> bool: | |
| t = (text or "").strip().lower() | |
| if len(t.split()) <= 2 and any(tok in t for tok in _short_chat_terms): | |
| return True | |
| return False | |
| def _question_score(text: str) -> float: | |
| s = 0.0 | |
| if _question_mark_re.search(text or ""): s += 1.0 | |
| if _question_word_re.match((text or "").strip()): s += 0.6 | |
| return s | |
| def _history_signal(messages: List[Dict[str,str]]) -> float: | |
| # simple heuristic: if previous user messages contained technical keywords recently, boost | |
| if not messages or len(messages) < 2: return 0.0 | |
| prev = " ".join(m.get("content","") for m in messages[-4:-1] if isinstance(m, dict)) | |
| return float(min(3, _keyword_matches(prev))) * 0.2 | |
| # ---------- Softmax helper ---------- | |
| def _softmax(scores: List[float], temp: float = 1.0) -> List[float]: | |
| if not scores: | |
| return [] | |
| exps = [math.exp(s / temp) for s in scores] | |
| s = sum(exps) | |
| if s == 0: return [1.0/len(scores)]*len(scores) | |
| return [e/s for e in exps] | |
| # ---------- Expert base classes ---------- | |
| class Expert: | |
| name: str | |
| description: str | |
| def __init__(self, name:str, description:str): | |
| self.name = name | |
| self.description = description | |
| def score(self, features: Dict[str,Any]) -> float: | |
| """Return a heuristic affinity score (higher = more relevant).""" | |
| # default neutral | |
| return 0.0 | |
| def run(self, messages: List[Dict[str,str]], features: Dict[str,Any]) -> Dict[str,Any]: | |
| """ | |
| Optionally run expert-specific logic (synchronously). | |
| For now return metadata only. In production this could call a model endpoint. | |
| """ | |
| return {"expert": self.name, "action": "noop", "note": "heuristic-only"} | |
| # ---------- Concrete experts ---------- | |
| class ShortChatExpert(Expert): | |
| def __init__(self): | |
| super().__init__("short_chat", "Handles greetings/short conversational turns") | |
| def score(self, f): | |
| if f.get("short_chat"): return 5.0 | |
| return 0.1 | |
| def run(self, messages, features): | |
| return {"expert": self.name, "action": "short_reply", "note": "Use concise response template."} | |
| class CodeExpert(Expert): | |
| def __init__(self): | |
| super().__init__("code_expert", "Handles code, stacktraces, debugging tasks") | |
| def score(self, f): | |
| sc = 0.0 | |
| if f.get("has_code"): sc += 4.0 | |
| sc += 0.8 * f.get("kw_count",0) | |
| sc += 0.6 * f.get("numeric_count",0) | |
| return sc | |
| def run(self, messages, features): | |
| # Placeholder: in production call a code-specialized model or analyzer endpoint | |
| return {"expert": self.name, "action": "analyze_code", "note": "Run code LLM or static-checker (not implemented)."} | |
| class NLUExpert(Expert): | |
| def __init__(self): | |
| super().__init__("nlu_expert", "Deep intent and slot extraction / classification") | |
| def score(self, f): | |
| sc = 1.0 * f.get("kw_count",0) | |
| sc += 0.8 * f.get("question_score",0) | |
| sc += 0.4 * (f.get("word_count",0) / 30.0) | |
| sc += 0.6 * f.get("history_signal",0) | |
| return sc | |
| def run(self, messages, features): | |
| # Example: return intent classification tags (heuristic) | |
| intent = "general" | |
| if features.get("kw_count",0) >= 2 or features.get("has_code"): | |
| intent = "technical_task" | |
| elif features.get("short_chat"): | |
| intent = "social" | |
| return {"expert": self.name, "action": "classify_intent", "intent": intent} | |
| class RAGExpert(Expert): | |
| def __init__(self): | |
| super().__init__("rag_expert", "Handles retrieval-augmented requests (RAG/agent)") | |
| def score(self, f): | |
| sc = 0.0 | |
| # if user mentions 'search', 'latest', has urls, or long context -> RAG useful | |
| if f.get("has_url"): sc += 2.0 | |
| sc += 1.2 * f.get("kw_count",0) | |
| sc += 0.9 * f.get("numeric_count",0) | |
| if f.get("word_count",0) > 60: sc += 1.5 | |
| return sc | |
| def run(self, messages, features): | |
| # Placeholder: should trigger a retrieval job or agent | |
| return {"expert": self.name, "action": "retrieve", "note": "Trigger RAG pipeline or agent (not implemented)."} | |
| class SafetyExpert(Expert): | |
| def __init__(self): | |
| super().__init__("safety_expert", "Safety checks, identity questions, hallucination guards") | |
| def score(self, f): | |
| sc = 0.0 | |
| txt = f.get("last_text","").lower() if f.get("last_text") else "" | |
| if any(w in txt for w in ["who created you","who made you","identity","where are you from"]): | |
| sc += 3.0 | |
| # any suspicious tokens (email, ssn, credit card-like) -> safety | |
| if re.search(r"\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b", txt): | |
| sc += 4.0 | |
| return sc | |
| def run(self, messages, features): | |
| return {"expert": self.name, "action": "safety_check", "note": "Run policy checks."} | |
| # Add more experts as needed... | |
| _DEFAULT_EXPERTS: List[Expert] = [ | |
| ShortChatExpert(), | |
| NLUExpert(), | |
| CodeExpert(), | |
| RAGExpert(), | |
| SafetyExpert(), | |
| ] | |
| # ---------- Core gating/routing function ---------- | |
| def _extract_features(messages: List[Dict[str,str]]) -> Dict[str,Any]: | |
| if not messages: | |
| return {"word_count": 0, "kw_count":0, "has_code": False, "numeric_count":0, "question_score":0.0, "short_chat": False, "has_url": False, "history_signal":0.0, "last_text":""} | |
| last = messages[-1].get("content","") if isinstance(messages[-1], dict) else str(messages[-1]) | |
| prev = " ".join(m.get("content","") for m in messages[:-1] if isinstance(m, dict)) | |
| full = (prev + "\n" + last).strip() | |
| features = {} | |
| features["last_text"] = last | |
| features["word_count"] = _word_count(last) | |
| features["total_word_count"] = _word_count(full) | |
| features["kw_count"] = _keyword_matches(full) | |
| features["has_code"] = _has_code(full) | |
| features["has_list"] = _has_list(full) | |
| features["numeric_count"] = _numeric_count(full) | |
| features["question_score"] = _question_score(last) | |
| features["short_chat"] = _short_chat_score(last) | |
| features["has_url"] = bool(_url_re.search(full)) | |
| features["is_urgent"] = _is_urgent(full) | |
| features["history_signal"] = _history_signal(messages) | |
| return features | |
| def _gate_select_experts(features: Dict[str,Any], experts: List[Expert]) -> Tuple[List[Tuple[Expert,float]], List[float]]: | |
| # compute raw scores per expert | |
| raw_scores = [max(0.0, e.score(features)) for e in experts] | |
| if not raw_scores: | |
| return [], [] | |
| # normalize via softmax for relative weighting | |
| probs = _softmax(raw_scores, temp=SOFTMAX_TEMPERATURE) | |
| # select top-K experts by probability | |
| indexed = list(enumerate(probs)) | |
| indexed.sort(key=lambda x: x[1], reverse=True) | |
| top = indexed[:TOP_K] | |
| chosen = [(experts[i], probs[i]) for i, _ in top] | |
| return chosen, probs | |
| # ---------- Public API: analyze_flow ---------- | |
| def analyze_flow(messages: List[Dict[str,str]]) -> Dict[str,Any]: | |
| """ | |
| Returns: | |
| { | |
| "route": "direct" / "planning", | |
| "is_complex": bool, | |
| "flow_label": str, | |
| "confidence": float, | |
| "explanation": str, | |
| "experts": [ {"name":.., "score":.., "note":..}, ... ] | |
| } | |
| """ | |
| features = _extract_features(messages) | |
| experts = _DEFAULT_EXPERTS.copy() | |
| # gating | |
| chosen, probs = _gate_select_experts(features, experts) | |
| # Decide flow_label heuristics based on features | |
| flow_label = "general" | |
| if features.get("has_code") or features.get("kw_count",0) >= 2: | |
| flow_label = "coding_request" | |
| elif features.get("is_urgent"): | |
| flow_label = "escalation" | |
| elif features.get("kw_count",0) >= 1 and features.get("word_count",0) >= 25: | |
| flow_label = "task_request" | |
| elif features.get("short_chat"): | |
| flow_label = "short_chat" | |
| elif features.get("question_score",0) > 0.9 and features.get("word_count",0) < 25: | |
| flow_label = "short_question" | |
| # compute a complexity/confidence scalar from features + expert probs | |
| feature_score = ( | |
| WEIGHT_LENGTH * (features.get("word_count",0) / 30.0) + | |
| WEIGHT_KEYWORD * features.get("kw_count",0) + | |
| WEIGHT_CODE * (4.0 if features.get("has_code") else 0.0) + | |
| WEIGHT_NUMERIC * features.get("numeric_count",0) + | |
| WEIGHT_QUESTION * features.get("question_score",0) + | |
| WEIGHT_URGENT * (1.0 if features.get("is_urgent") else 0.0) + | |
| WEIGHT_HISTORY * features.get("history_signal",0) | |
| ) | |
| # Map to 0..1 via logistic | |
| conf = 1.0 / (1.0 + math.exp(-0.45 * (feature_score - 2.0))) | |
| conf = max(0.0, min(1.0, conf)) | |
| # route decision | |
| is_complex = conf >= MIN_COMPLEX_CONF_FOR_PLANNING or features.get("has_code") or features.get("kw_count",0) >= 2 | |
| # short-chat override: always direct | |
| if features.get("short_chat"): | |
| route = "direct" | |
| is_complex = False | |
| else: | |
| route = "planning" if is_complex else "direct" | |
| # Build explanation and expert list | |
| expert_list = [] | |
| for e, p in chosen: | |
| # we can call run() here for metadata without actually executing heavy ops | |
| meta = e.run(messages, features) | |
| expert_list.append({"name": e.name, "prob": round(float(p),4), "meta": meta}) | |
| explanation = ("features=" + json.dumps(features) + f" | feature_score={feature_score:.2f} | conf={conf:.3f} | chosen={[e.name for e,_ in chosen]}") | |
| return { | |
| "route": route, | |
| "is_complex": bool(is_complex), | |
| "flow_label": flow_label, | |
| "confidence": round(float(conf), 3), | |
| "explanation": explanation, | |
| "experts": expert_list | |
| } | |
| # ---------- Debug helper ---------- | |
| def debug_flow(text: str, history: List[str] = None): | |
| hist_msgs = [{"role":"user","content":h} for h in (history or [])] | |
| hist_msgs.append({"role":"user","content": text}) | |
| return analyze_flow(hist_msgs) | |
| # Example self-test when run directly | |
| if __name__ == "__main__": | |
| tests = [ | |
| "Hi 🙂", | |
| "What is your name?", | |
| "What is neural network", | |
| "My app crashes with TypeError: undefined is not a function. Stacktrace: ```TypeError: ...``` How to fix?", | |
| "Deploy my node app to Docker with Nginx and SSL — step-by-step please.", | |
| "Quick: 2+2?" | |
| ] | |
| for t in tests: | |
| print("----") | |
| print("MSG:", t) | |
| out = debug_flow(t) | |
| print(json.dumps(out, indent=2)) |