Spaces:
Sleeping
Sleeping
File size: 14,373 Bytes
9d1f57d 8954d0c 9d1f57d 8954d0c 9d1f57d 8954d0c 9d1f57d 8954d0c 9d1f57d fbc8175 9d1f57d fbc8175 9d1f57d 8954d0c 9d1f57d fbc8175 9d1f57d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# behavior_model.py -- REPLACED with "Neural Structure / MoE-style Dispatcher"
"""
Large, modular 'neural structure' dispatcher (software MoE) for intent/complexity routing.
How to use:
- Replace your existing behavior_model.py with this file.
- app.py expects analyze_flow(messages) -> dict with keys:
{ route: "direct"|"planning", is_complex: bool, flow_label: str, confidence: float, explanation: str, experts: [...] }
Design:
- Feature extractor -> gating network (scoring) -> top-K expert selection -> combine/explain decision
- Experts are modular callables; by default they are heuristic "experts".
- To scale: implement Expert.run(...) to call real submodels/endpoints (local small models, remote microservices).
"""
from typing import List, Dict, Any, Callable, Tuple
import re
import math
import json
import os
import statistics
# ---------- Configurable constants ----------
TOP_K = int(os.environ.get("NS_TOP_K", "2")) # how many experts to activate per request
SOFTMAX_TEMPERATURE = float(os.environ.get("NS_TEMP", "1.0"))
MIN_COMPLEX_CONF_FOR_PLANNING = float(os.environ.get("NS_MIN_COMPLEX_CONF", "0.56"))
MAX_EXPERTS = int(os.environ.get("NS_MAX_EXPERTS", "12"))
# Weights (tunables)
WEIGHT_LENGTH = float(os.environ.get("NS_W_LENGTH", "1.0"))
WEIGHT_KEYWORD = float(os.environ.get("NS_W_KEYWORD", "1.9"))
WEIGHT_CODE = float(os.environ.get("NS_W_CODE", "2.4"))
WEIGHT_NUMERIC = float(os.environ.get("NS_W_NUMERIC", "1.2"))
WEIGHT_QUESTION = float(os.environ.get("NS_W_QUESTION", "0.6"))
WEIGHT_URGENT = float(os.environ.get("NS_W_URGENT", "2.2"))
WEIGHT_HISTORY = float(os.environ.get("NS_W_HISTORY", "0.8"))
# ---------- Regex / keyword lists ----------
_code_fence_re = re.compile(r"```.+?```", flags=re.DOTALL | re.IGNORECASE)
_inline_code_re = re.compile(r"`[^`]+`")
_number_re = re.compile(r"\b\d+(\.\d+)?\b")
_list_marker_re = re.compile(r"(^\s*[-*β’]\s+)|(^\s*\d+\.\s+)", flags=re.MULTILINE)
_url_re = re.compile(r"https?://\S+")
_question_word_re = re.compile(r"^\s*(who|what|why|how|when|which|where)\b", flags=re.IGNORECASE)
_question_mark_re = re.compile(r"\?$")
_task_keywords = set(k.lower() for k in [
"build", "create", "implement", "develop", "deploy", "install", "setup", "configure",
"optimi", "debug", "fix", "error", "crash", "stacktrace", "exception", "traceback",
"code", "script", "function", "api", "endpoint", "database", "sql", "mongodb", "mysql",
"docker", "deno", "node", "express", "php", "python", "java", "rust", "golang", "compile",
"performance", "latency", "bandwidth", "optimization", "optimize",
"algorithm", "complexity", "big o", "time complexity", "space complexity",
"report", "plan", "design", "architecture", "integration", "migrate", "refactor",
"test case", "unit test", "e2e test",
"prove", "derive", "integral", "differentiate", "matrix", "neural network", "train", "model",
])
_urgent_words = set(w.lower() for w in ["urgent", "asap", "immediately", "now", "critical", "important", "priority", "must"])
_short_chat_terms = set(w.lower() for w in ["hi", "hello", "thanks", "thank you", "bye", "ok", "okay", "nice", "cool", "π", "π"])
# ---------- Utility functions ----------
def _word_count(text: str) -> int:
return len(re.findall(r"\w+", text)) if text else 0
def _has_code(text: str) -> bool:
if not text: return False
return bool(_code_fence_re.search(text) or _inline_code_re.search(text) or re.search(r"\bdef\s+\w+\(|;\s*$", text, flags=re.IGNORECASE))
def _has_list(text: str) -> bool:
return bool(_list_marker_re.search(text))
def _keyword_matches(text: str) -> int:
if not text: return 0
t = text.lower()
cnt = 0
for kw in _task_keywords:
if kw in t:
cnt += 1
return cnt
def _numeric_count(text: str) -> int:
return len(_number_re.findall(text or ""))
def _is_urgent(text: str) -> bool:
t = (text or "").lower()
return any(w in t for w in _urgent_words)
def _short_chat_score(text: str) -> bool:
t = (text or "").strip().lower()
if len(t.split()) <= 2 and any(tok in t for tok in _short_chat_terms):
return True
return False
def _question_score(text: str) -> float:
s = 0.0
if _question_mark_re.search(text or ""): s += 1.0
if _question_word_re.match((text or "").strip()): s += 0.6
return s
def _history_signal(messages: List[Dict[str,str]]) -> float:
# simple heuristic: if previous user messages contained technical keywords recently, boost
if not messages or len(messages) < 2: return 0.0
prev = " ".join(m.get("content","") for m in messages[-4:-1] if isinstance(m, dict))
return float(min(3, _keyword_matches(prev))) * 0.2
# ---------- Softmax helper ----------
def _softmax(scores: List[float], temp: float = 1.0) -> List[float]:
if not scores:
return []
exps = [math.exp(s / temp) for s in scores]
s = sum(exps)
if s == 0: return [1.0/len(scores)]*len(scores)
return [e/s for e in exps]
# ---------- Expert base classes ----------
class Expert:
name: str
description: str
def __init__(self, name:str, description:str):
self.name = name
self.description = description
def score(self, features: Dict[str,Any]) -> float:
"""Return a heuristic affinity score (higher = more relevant)."""
# default neutral
return 0.0
def run(self, messages: List[Dict[str,str]], features: Dict[str,Any]) -> Dict[str,Any]:
"""
Optionally run expert-specific logic (synchronously).
For now return metadata only. In production this could call a model endpoint.
"""
return {"expert": self.name, "action": "noop", "note": "heuristic-only"}
# ---------- Concrete experts ----------
class ShortChatExpert(Expert):
def __init__(self):
super().__init__("short_chat", "Handles greetings/short conversational turns")
def score(self, f):
if f.get("short_chat"): return 5.0
return 0.1
def run(self, messages, features):
return {"expert": self.name, "action": "short_reply", "note": "Use concise response template."}
class CodeExpert(Expert):
def __init__(self):
super().__init__("code_expert", "Handles code, stacktraces, debugging tasks")
def score(self, f):
sc = 0.0
if f.get("has_code"): sc += 4.0
sc += 0.8 * f.get("kw_count",0)
sc += 0.6 * f.get("numeric_count",0)
return sc
def run(self, messages, features):
# Placeholder: in production call a code-specialized model or analyzer endpoint
return {"expert": self.name, "action": "analyze_code", "note": "Run code LLM or static-checker (not implemented)."}
class NLUExpert(Expert):
def __init__(self):
super().__init__("nlu_expert", "Deep intent and slot extraction / classification")
def score(self, f):
sc = 1.0 * f.get("kw_count",0)
sc += 0.8 * f.get("question_score",0)
sc += 0.4 * (f.get("word_count",0) / 30.0)
sc += 0.6 * f.get("history_signal",0)
return sc
def run(self, messages, features):
# Example: return intent classification tags (heuristic)
intent = "general"
if features.get("kw_count",0) >= 2 or features.get("has_code"):
intent = "technical_task"
elif features.get("short_chat"):
intent = "social"
return {"expert": self.name, "action": "classify_intent", "intent": intent}
class RAGExpert(Expert):
def __init__(self):
super().__init__("rag_expert", "Handles retrieval-augmented requests (RAG/agent)")
def score(self, f):
sc = 0.0
# if user mentions 'search', 'latest', has urls, or long context -> RAG useful
if f.get("has_url"): sc += 2.0
sc += 1.2 * f.get("kw_count",0)
sc += 0.9 * f.get("numeric_count",0)
if f.get("word_count",0) > 60: sc += 1.5
return sc
def run(self, messages, features):
# Placeholder: should trigger a retrieval job or agent
return {"expert": self.name, "action": "retrieve", "note": "Trigger RAG pipeline or agent (not implemented)."}
class SafetyExpert(Expert):
def __init__(self):
super().__init__("safety_expert", "Safety checks, identity questions, hallucination guards")
def score(self, f):
sc = 0.0
txt = f.get("last_text","").lower() if f.get("last_text") else ""
if any(w in txt for w in ["who created you","who made you","identity","where are you from"]):
sc += 3.0
# any suspicious tokens (email, ssn, credit card-like) -> safety
if re.search(r"\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b", txt):
sc += 4.0
return sc
def run(self, messages, features):
return {"expert": self.name, "action": "safety_check", "note": "Run policy checks."}
# Add more experts as needed...
_DEFAULT_EXPERTS: List[Expert] = [
ShortChatExpert(),
NLUExpert(),
CodeExpert(),
RAGExpert(),
SafetyExpert(),
]
# ---------- Core gating/routing function ----------
def _extract_features(messages: List[Dict[str,str]]) -> Dict[str,Any]:
if not messages:
return {"word_count": 0, "kw_count":0, "has_code": False, "numeric_count":0, "question_score":0.0, "short_chat": False, "has_url": False, "history_signal":0.0, "last_text":""}
last = messages[-1].get("content","") if isinstance(messages[-1], dict) else str(messages[-1])
prev = " ".join(m.get("content","") for m in messages[:-1] if isinstance(m, dict))
full = (prev + "\n" + last).strip()
features = {}
features["last_text"] = last
features["word_count"] = _word_count(last)
features["total_word_count"] = _word_count(full)
features["kw_count"] = _keyword_matches(full)
features["has_code"] = _has_code(full)
features["has_list"] = _has_list(full)
features["numeric_count"] = _numeric_count(full)
features["question_score"] = _question_score(last)
features["short_chat"] = _short_chat_score(last)
features["has_url"] = bool(_url_re.search(full))
features["is_urgent"] = _is_urgent(full)
features["history_signal"] = _history_signal(messages)
return features
def _gate_select_experts(features: Dict[str,Any], experts: List[Expert]) -> Tuple[List[Tuple[Expert,float]], List[float]]:
# compute raw scores per expert
raw_scores = [max(0.0, e.score(features)) for e in experts]
if not raw_scores:
return [], []
# normalize via softmax for relative weighting
probs = _softmax(raw_scores, temp=SOFTMAX_TEMPERATURE)
# select top-K experts by probability
indexed = list(enumerate(probs))
indexed.sort(key=lambda x: x[1], reverse=True)
top = indexed[:TOP_K]
chosen = [(experts[i], probs[i]) for i, _ in top]
return chosen, probs
# ---------- Public API: analyze_flow ----------
def analyze_flow(messages: List[Dict[str,str]]) -> Dict[str,Any]:
"""
Returns:
{
"route": "direct" / "planning",
"is_complex": bool,
"flow_label": str,
"confidence": float,
"explanation": str,
"experts": [ {"name":.., "score":.., "note":..}, ... ]
}
"""
features = _extract_features(messages)
experts = _DEFAULT_EXPERTS.copy()
# gating
chosen, probs = _gate_select_experts(features, experts)
# Decide flow_label heuristics based on features
flow_label = "general"
if features.get("has_code") or features.get("kw_count",0) >= 2:
flow_label = "coding_request"
elif features.get("is_urgent"):
flow_label = "escalation"
elif features.get("kw_count",0) >= 1 and features.get("word_count",0) >= 25:
flow_label = "task_request"
elif features.get("short_chat"):
flow_label = "short_chat"
elif features.get("question_score",0) > 0.9 and features.get("word_count",0) < 25:
flow_label = "short_question"
# compute a complexity/confidence scalar from features + expert probs
feature_score = (
WEIGHT_LENGTH * (features.get("word_count",0) / 30.0) +
WEIGHT_KEYWORD * features.get("kw_count",0) +
WEIGHT_CODE * (4.0 if features.get("has_code") else 0.0) +
WEIGHT_NUMERIC * features.get("numeric_count",0) +
WEIGHT_QUESTION * features.get("question_score",0) +
WEIGHT_URGENT * (1.0 if features.get("is_urgent") else 0.0) +
WEIGHT_HISTORY * features.get("history_signal",0)
)
# Map to 0..1 via logistic
conf = 1.0 / (1.0 + math.exp(-0.45 * (feature_score - 2.0)))
conf = max(0.0, min(1.0, conf))
# route decision
is_complex = conf >= MIN_COMPLEX_CONF_FOR_PLANNING or features.get("has_code") or features.get("kw_count",0) >= 2
# short-chat override: always direct
if features.get("short_chat"):
route = "direct"
is_complex = False
else:
route = "planning" if is_complex else "direct"
# Build explanation and expert list
expert_list = []
for e, p in chosen:
# we can call run() here for metadata without actually executing heavy ops
meta = e.run(messages, features)
expert_list.append({"name": e.name, "prob": round(float(p),4), "meta": meta})
explanation = ("features=" + json.dumps(features) + f" | feature_score={feature_score:.2f} | conf={conf:.3f} | chosen={[e.name for e,_ in chosen]}")
return {
"route": route,
"is_complex": bool(is_complex),
"flow_label": flow_label,
"confidence": round(float(conf), 3),
"explanation": explanation,
"experts": expert_list
}
# ---------- Debug helper ----------
def debug_flow(text: str, history: List[str] = None):
hist_msgs = [{"role":"user","content":h} for h in (history or [])]
hist_msgs.append({"role":"user","content": text})
return analyze_flow(hist_msgs)
# Example self-test when run directly
if __name__ == "__main__":
tests = [
"Hi π",
"What is your name?",
"What is neural network",
"My app crashes with TypeError: undefined is not a function. Stacktrace: ```TypeError: ...``` How to fix?",
"Deploy my node app to Docker with Nginx and SSL β step-by-step please.",
"Quick: 2+2?"
]
for t in tests:
print("----")
print("MSG:", t)
out = debug_flow(t)
print(json.dumps(out, indent=2)) |