Nexari-Server / behavior_model.py
Nexari-Research's picture
Update behavior_model.py
9d1f57d verified
# behavior_model.py -- REPLACED with "Neural Structure / MoE-style Dispatcher"
"""
Large, modular 'neural structure' dispatcher (software MoE) for intent/complexity routing.
How to use:
- Replace your existing behavior_model.py with this file.
- app.py expects analyze_flow(messages) -> dict with keys:
{ route: "direct"|"planning", is_complex: bool, flow_label: str, confidence: float, explanation: str, experts: [...] }
Design:
- Feature extractor -> gating network (scoring) -> top-K expert selection -> combine/explain decision
- Experts are modular callables; by default they are heuristic "experts".
- To scale: implement Expert.run(...) to call real submodels/endpoints (local small models, remote microservices).
"""
from typing import List, Dict, Any, Callable, Tuple
import re
import math
import json
import os
import statistics
# ---------- Configurable constants ----------
TOP_K = int(os.environ.get("NS_TOP_K", "2")) # how many experts to activate per request
SOFTMAX_TEMPERATURE = float(os.environ.get("NS_TEMP", "1.0"))
MIN_COMPLEX_CONF_FOR_PLANNING = float(os.environ.get("NS_MIN_COMPLEX_CONF", "0.56"))
MAX_EXPERTS = int(os.environ.get("NS_MAX_EXPERTS", "12"))
# Weights (tunables)
WEIGHT_LENGTH = float(os.environ.get("NS_W_LENGTH", "1.0"))
WEIGHT_KEYWORD = float(os.environ.get("NS_W_KEYWORD", "1.9"))
WEIGHT_CODE = float(os.environ.get("NS_W_CODE", "2.4"))
WEIGHT_NUMERIC = float(os.environ.get("NS_W_NUMERIC", "1.2"))
WEIGHT_QUESTION = float(os.environ.get("NS_W_QUESTION", "0.6"))
WEIGHT_URGENT = float(os.environ.get("NS_W_URGENT", "2.2"))
WEIGHT_HISTORY = float(os.environ.get("NS_W_HISTORY", "0.8"))
# ---------- Regex / keyword lists ----------
_code_fence_re = re.compile(r"```.+?```", flags=re.DOTALL | re.IGNORECASE)
_inline_code_re = re.compile(r"`[^`]+`")
_number_re = re.compile(r"\b\d+(\.\d+)?\b")
_list_marker_re = re.compile(r"(^\s*[-*•]\s+)|(^\s*\d+\.\s+)", flags=re.MULTILINE)
_url_re = re.compile(r"https?://\S+")
_question_word_re = re.compile(r"^\s*(who|what|why|how|when|which|where)\b", flags=re.IGNORECASE)
_question_mark_re = re.compile(r"\?$")
_task_keywords = set(k.lower() for k in [
"build", "create", "implement", "develop", "deploy", "install", "setup", "configure",
"optimi", "debug", "fix", "error", "crash", "stacktrace", "exception", "traceback",
"code", "script", "function", "api", "endpoint", "database", "sql", "mongodb", "mysql",
"docker", "deno", "node", "express", "php", "python", "java", "rust", "golang", "compile",
"performance", "latency", "bandwidth", "optimization", "optimize",
"algorithm", "complexity", "big o", "time complexity", "space complexity",
"report", "plan", "design", "architecture", "integration", "migrate", "refactor",
"test case", "unit test", "e2e test",
"prove", "derive", "integral", "differentiate", "matrix", "neural network", "train", "model",
])
_urgent_words = set(w.lower() for w in ["urgent", "asap", "immediately", "now", "critical", "important", "priority", "must"])
_short_chat_terms = set(w.lower() for w in ["hi", "hello", "thanks", "thank you", "bye", "ok", "okay", "nice", "cool", "🙂", "😊"])
# ---------- Utility functions ----------
def _word_count(text: str) -> int:
return len(re.findall(r"\w+", text)) if text else 0
def _has_code(text: str) -> bool:
if not text: return False
return bool(_code_fence_re.search(text) or _inline_code_re.search(text) or re.search(r"\bdef\s+\w+\(|;\s*$", text, flags=re.IGNORECASE))
def _has_list(text: str) -> bool:
return bool(_list_marker_re.search(text))
def _keyword_matches(text: str) -> int:
if not text: return 0
t = text.lower()
cnt = 0
for kw in _task_keywords:
if kw in t:
cnt += 1
return cnt
def _numeric_count(text: str) -> int:
return len(_number_re.findall(text or ""))
def _is_urgent(text: str) -> bool:
t = (text or "").lower()
return any(w in t for w in _urgent_words)
def _short_chat_score(text: str) -> bool:
t = (text or "").strip().lower()
if len(t.split()) <= 2 and any(tok in t for tok in _short_chat_terms):
return True
return False
def _question_score(text: str) -> float:
s = 0.0
if _question_mark_re.search(text or ""): s += 1.0
if _question_word_re.match((text or "").strip()): s += 0.6
return s
def _history_signal(messages: List[Dict[str,str]]) -> float:
# simple heuristic: if previous user messages contained technical keywords recently, boost
if not messages or len(messages) < 2: return 0.0
prev = " ".join(m.get("content","") for m in messages[-4:-1] if isinstance(m, dict))
return float(min(3, _keyword_matches(prev))) * 0.2
# ---------- Softmax helper ----------
def _softmax(scores: List[float], temp: float = 1.0) -> List[float]:
if not scores:
return []
exps = [math.exp(s / temp) for s in scores]
s = sum(exps)
if s == 0: return [1.0/len(scores)]*len(scores)
return [e/s for e in exps]
# ---------- Expert base classes ----------
class Expert:
name: str
description: str
def __init__(self, name:str, description:str):
self.name = name
self.description = description
def score(self, features: Dict[str,Any]) -> float:
"""Return a heuristic affinity score (higher = more relevant)."""
# default neutral
return 0.0
def run(self, messages: List[Dict[str,str]], features: Dict[str,Any]) -> Dict[str,Any]:
"""
Optionally run expert-specific logic (synchronously).
For now return metadata only. In production this could call a model endpoint.
"""
return {"expert": self.name, "action": "noop", "note": "heuristic-only"}
# ---------- Concrete experts ----------
class ShortChatExpert(Expert):
def __init__(self):
super().__init__("short_chat", "Handles greetings/short conversational turns")
def score(self, f):
if f.get("short_chat"): return 5.0
return 0.1
def run(self, messages, features):
return {"expert": self.name, "action": "short_reply", "note": "Use concise response template."}
class CodeExpert(Expert):
def __init__(self):
super().__init__("code_expert", "Handles code, stacktraces, debugging tasks")
def score(self, f):
sc = 0.0
if f.get("has_code"): sc += 4.0
sc += 0.8 * f.get("kw_count",0)
sc += 0.6 * f.get("numeric_count",0)
return sc
def run(self, messages, features):
# Placeholder: in production call a code-specialized model or analyzer endpoint
return {"expert": self.name, "action": "analyze_code", "note": "Run code LLM or static-checker (not implemented)."}
class NLUExpert(Expert):
def __init__(self):
super().__init__("nlu_expert", "Deep intent and slot extraction / classification")
def score(self, f):
sc = 1.0 * f.get("kw_count",0)
sc += 0.8 * f.get("question_score",0)
sc += 0.4 * (f.get("word_count",0) / 30.0)
sc += 0.6 * f.get("history_signal",0)
return sc
def run(self, messages, features):
# Example: return intent classification tags (heuristic)
intent = "general"
if features.get("kw_count",0) >= 2 or features.get("has_code"):
intent = "technical_task"
elif features.get("short_chat"):
intent = "social"
return {"expert": self.name, "action": "classify_intent", "intent": intent}
class RAGExpert(Expert):
def __init__(self):
super().__init__("rag_expert", "Handles retrieval-augmented requests (RAG/agent)")
def score(self, f):
sc = 0.0
# if user mentions 'search', 'latest', has urls, or long context -> RAG useful
if f.get("has_url"): sc += 2.0
sc += 1.2 * f.get("kw_count",0)
sc += 0.9 * f.get("numeric_count",0)
if f.get("word_count",0) > 60: sc += 1.5
return sc
def run(self, messages, features):
# Placeholder: should trigger a retrieval job or agent
return {"expert": self.name, "action": "retrieve", "note": "Trigger RAG pipeline or agent (not implemented)."}
class SafetyExpert(Expert):
def __init__(self):
super().__init__("safety_expert", "Safety checks, identity questions, hallucination guards")
def score(self, f):
sc = 0.0
txt = f.get("last_text","").lower() if f.get("last_text") else ""
if any(w in txt for w in ["who created you","who made you","identity","where are you from"]):
sc += 3.0
# any suspicious tokens (email, ssn, credit card-like) -> safety
if re.search(r"\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b", txt):
sc += 4.0
return sc
def run(self, messages, features):
return {"expert": self.name, "action": "safety_check", "note": "Run policy checks."}
# Add more experts as needed...
_DEFAULT_EXPERTS: List[Expert] = [
ShortChatExpert(),
NLUExpert(),
CodeExpert(),
RAGExpert(),
SafetyExpert(),
]
# ---------- Core gating/routing function ----------
def _extract_features(messages: List[Dict[str,str]]) -> Dict[str,Any]:
if not messages:
return {"word_count": 0, "kw_count":0, "has_code": False, "numeric_count":0, "question_score":0.0, "short_chat": False, "has_url": False, "history_signal":0.0, "last_text":""}
last = messages[-1].get("content","") if isinstance(messages[-1], dict) else str(messages[-1])
prev = " ".join(m.get("content","") for m in messages[:-1] if isinstance(m, dict))
full = (prev + "\n" + last).strip()
features = {}
features["last_text"] = last
features["word_count"] = _word_count(last)
features["total_word_count"] = _word_count(full)
features["kw_count"] = _keyword_matches(full)
features["has_code"] = _has_code(full)
features["has_list"] = _has_list(full)
features["numeric_count"] = _numeric_count(full)
features["question_score"] = _question_score(last)
features["short_chat"] = _short_chat_score(last)
features["has_url"] = bool(_url_re.search(full))
features["is_urgent"] = _is_urgent(full)
features["history_signal"] = _history_signal(messages)
return features
def _gate_select_experts(features: Dict[str,Any], experts: List[Expert]) -> Tuple[List[Tuple[Expert,float]], List[float]]:
# compute raw scores per expert
raw_scores = [max(0.0, e.score(features)) for e in experts]
if not raw_scores:
return [], []
# normalize via softmax for relative weighting
probs = _softmax(raw_scores, temp=SOFTMAX_TEMPERATURE)
# select top-K experts by probability
indexed = list(enumerate(probs))
indexed.sort(key=lambda x: x[1], reverse=True)
top = indexed[:TOP_K]
chosen = [(experts[i], probs[i]) for i, _ in top]
return chosen, probs
# ---------- Public API: analyze_flow ----------
def analyze_flow(messages: List[Dict[str,str]]) -> Dict[str,Any]:
"""
Returns:
{
"route": "direct" / "planning",
"is_complex": bool,
"flow_label": str,
"confidence": float,
"explanation": str,
"experts": [ {"name":.., "score":.., "note":..}, ... ]
}
"""
features = _extract_features(messages)
experts = _DEFAULT_EXPERTS.copy()
# gating
chosen, probs = _gate_select_experts(features, experts)
# Decide flow_label heuristics based on features
flow_label = "general"
if features.get("has_code") or features.get("kw_count",0) >= 2:
flow_label = "coding_request"
elif features.get("is_urgent"):
flow_label = "escalation"
elif features.get("kw_count",0) >= 1 and features.get("word_count",0) >= 25:
flow_label = "task_request"
elif features.get("short_chat"):
flow_label = "short_chat"
elif features.get("question_score",0) > 0.9 and features.get("word_count",0) < 25:
flow_label = "short_question"
# compute a complexity/confidence scalar from features + expert probs
feature_score = (
WEIGHT_LENGTH * (features.get("word_count",0) / 30.0) +
WEIGHT_KEYWORD * features.get("kw_count",0) +
WEIGHT_CODE * (4.0 if features.get("has_code") else 0.0) +
WEIGHT_NUMERIC * features.get("numeric_count",0) +
WEIGHT_QUESTION * features.get("question_score",0) +
WEIGHT_URGENT * (1.0 if features.get("is_urgent") else 0.0) +
WEIGHT_HISTORY * features.get("history_signal",0)
)
# Map to 0..1 via logistic
conf = 1.0 / (1.0 + math.exp(-0.45 * (feature_score - 2.0)))
conf = max(0.0, min(1.0, conf))
# route decision
is_complex = conf >= MIN_COMPLEX_CONF_FOR_PLANNING or features.get("has_code") or features.get("kw_count",0) >= 2
# short-chat override: always direct
if features.get("short_chat"):
route = "direct"
is_complex = False
else:
route = "planning" if is_complex else "direct"
# Build explanation and expert list
expert_list = []
for e, p in chosen:
# we can call run() here for metadata without actually executing heavy ops
meta = e.run(messages, features)
expert_list.append({"name": e.name, "prob": round(float(p),4), "meta": meta})
explanation = ("features=" + json.dumps(features) + f" | feature_score={feature_score:.2f} | conf={conf:.3f} | chosen={[e.name for e,_ in chosen]}")
return {
"route": route,
"is_complex": bool(is_complex),
"flow_label": flow_label,
"confidence": round(float(conf), 3),
"explanation": explanation,
"experts": expert_list
}
# ---------- Debug helper ----------
def debug_flow(text: str, history: List[str] = None):
hist_msgs = [{"role":"user","content":h} for h in (history or [])]
hist_msgs.append({"role":"user","content": text})
return analyze_flow(hist_msgs)
# Example self-test when run directly
if __name__ == "__main__":
tests = [
"Hi 🙂",
"What is your name?",
"What is neural network",
"My app crashes with TypeError: undefined is not a function. Stacktrace: ```TypeError: ...``` How to fix?",
"Deploy my node app to Docker with Nginx and SSL — step-by-step please.",
"Quick: 2+2?"
]
for t in tests:
print("----")
print("MSG:", t)
out = debug_flow(t)
print(json.dumps(out, indent=2))