@tool
def sql_investigate(query: str) -> dict:
try:
df = con.execute(query).df()
head = df.head(30)
return {
"rows": int(len(df)),
"columns": list(df.columns),
"preview": head.to_dict(orient="records")
}
except Exception as e:
return {"error": str(e)}
@tool
def log_pattern_scan(window_start_iso: str, window_end_iso: str, top_k: int = 8) -> dict:
ws = pd.to_datetime(window_start_iso)
we = pd.to_datetime(window_end_iso)
df = logs_df[(logs_df["ts"] >= ws) & (logs_df["ts"] <= we)].copy()
if df.empty:
return {"rows": 0, "top_error_kinds": [], "top_services": [], "top_endpoints": []}
df["error_kind_norm"] = df["error_kind"].fillna("").replace("", "NONE")
err = df[df["level"].isin(["WARN","ERROR"])].copy()
top_err = err["error_kind_norm"].value_counts().head(int(top_k)).to_dict()
top_svc = err["service"].value_counts().head(int(top_k)).to_dict()
top_ep = err["endpoint"].value_counts().head(int(top_k)).to_dict()
by_region = err.groupby("region").size().sort_values(ascending=False).head(int(top_k)).to_dict()
p95_latency = float(np.percentile(df["latency_ms"].values, 95))
return {
"rows": int(len(df)),
"warn_error_rows": int(len(err)),
"p95_latency_ms": p95_latency,
"top_error_kinds": top_err,
"top_services": top_svc,
"top_endpoints": top_ep,
"error_by_region": by_region
}
@tool
def propose_mitigations(hypothesis: str) -> dict:
h = hypothesis.lower()
mitigations = []
if "conn" in h or "pool" in h or "db" in h:
mitigations += [
{"action": "Increase DB connection pool size (bounded) and add backpressure at db-proxy", "owner": "Platform", "eta_days": 3},
{"action": "Add circuit breaker + adaptive timeouts between api-gateway and db-proxy", "owner": "Backend", "eta_days": 5},
{"action": "Tune query hotspots; add indexes for top offending endpoints", "owner": "Data/DBA", "eta_days": 7},
]
if "timeout" in h or "upstream" in h:
mitigations += [
{"action": "Implement hedged requests for idempotent calls (carefully) and tighten retry budgets", "owner": "Backend", "eta_days": 6},
{"action": "Add upstream SLO-aware load shedding at api-gateway", "owner": "Platform", "eta_days": 7},
]
if "cache" in h:
mitigations += [
{"action": "Add request coalescing and negative caching to prevent cache-miss storms", "owner": "Backend", "eta_days": 6},
{"action": "Prewarm cache for top endpoints during deploys", "owner": "SRE", "eta_days": 4},
]
if not mitigations:
mitigations += [
{"action": "Add targeted dashboards and alerts for the suspected bottleneck metric", "owner": "SRE", "eta_days": 3},
{"action": "Run controlled load test to reproduce and validate the hypothesis", "owner": "Perf Eng", "eta_days": 5},
]
mitigations = mitigations[:10]
return {"hypothesis": hypothesis, "mitigations": mitigations}
@tool
def draft_postmortem(title: str, window_start_iso: str, window_end_iso: str, customer_impact: str, suspected_root_cause: str, key_facts_json: str, mitigations_json: str) -> dict:
try:
facts = json.loads(key_facts_json)
except Exception:
facts = {"note": "key_facts_json was not valid JSON"}
try:
mits = json.loads(mitigations_json)
except Exception:
mits = {"note": "mitigations_json was not valid JSON"}
doc = {
"title": title,
"date_utc": datetime.utcnow().strftime("%Y-%m-%d"),
"incident_window_utc": {"start": window_start_iso, "end": window_end_iso},
"customer_impact": customer_impact,
"suspected_root_cause": suspected_root_cause,
"detection": {
"how_detected": "Automated anomaly detection + error-rate spike triage",
"gaps": ["Add earlier saturation alerting", "Improve symptom-to-cause correlation dashboards"]
},
"timeline": [
{"t": window_start_iso, "event": "Symptoms begin (latency/error anomalies)"},
{"t": "T+10m", "event": "On-call begins triage; identifies top services/endpoints"},
{"t": "T+25m", "event": "Mitigation actions initiated (throttling/backpressure)"},
{"t": window_end_iso, "event": "Customer impact ends; metrics stabilize"},
],
"key_facts": facts,
"corrective_actions": mits.get("mitigations", mits),
"followups": [
{"area": "Reliability", "task": "Add saturation signals + budget-based retries", "priority": "P1"},
{"area": "Observability", "task": "Add golden signals per service/endpoint", "priority": "P1"},
{"area": "Performance", "task": "Reproduce with load test and validate fix", "priority": "P2"},
],
"appendix": {"notes": "Generated by a Haystack multi-agent workflow (non-RAG)."}
}
return {"postmortem_json": doc}
llm = OpenAIChatGenerator(model="gpt-4o-mini")
state_schema = {
"metrics_csv_path": {"type": str},
"logs_csv_path": {"type": str},
"metrics_summary": {"type": dict},
"logs_summary": {"type": dict},
"incident_window": {"type": dict},
"investigation_notes": {"type": list, "handler": merge_lists},
"hypothesis": {"type": str},
"key_facts": {"type": dict},
"mitigation_plan": {"type": dict},
"postmortem": {"type": dict},
}
profiler_prompt = """You are a specialist incident profiler.
Goal: turn raw metrics/log summaries into crisp, high-signal findings.
Rules:
- Prefer calling tools over guessing.
- Output must be a JSON object with keys: window, symptoms, top_contributors, hypothesis, key_facts.
- Hypothesis must be falsifiable and mention at least one specific service and mechanism.
"""
writer_prompt = """You are a specialist postmortem writer.
Goal: produce a high-quality postmortem JSON (not prose) using the provided evidence and mitigation plan.
Rules:
- Call tools only if needed.
- Keep 'suspected_root_cause' specific and not generic.
- Ensure corrective actions have owners and eta_days.
"""
coordinator_prompt = """You are an incident commander coordinating a non-RAG multi-agent workflow.
You must:
1) Load inputs
2) Find an incident window (use p95_ms or error_rate)
3) Investigate with targeted SQL and log pattern scan
4) Ask the specialist profiler to synthesize evidence
5) Propose mitigations
6) Ask the specialist writer to draft a postmortem JSON
Return a final response with:
- A short executive summary (max 10 lines)
- The postmortem JSON
- A compact runbook checklist (bulleted)
"""
profiler_agent = Agent(
chat_generator=llm,
tools=[load_inputs, detect_incident_window, sql_investigate, log_pattern_scan],
system_prompt=profiler_prompt,
exit_conditions=["text"],
state_schema=state_schema
)
writer_agent = Agent(
chat_generator=llm,
tools=[draft_postmortem],
system_prompt=writer_prompt,
exit_conditions=["text"],
state_schema=state_schema
)
profiler_tool = ComponentTool(
component=profiler_agent,
name="profiler_specialist",
description="Synthesizes incident evidence into a falsifiable hypothesis and key facts (JSON output).",
outputs_to_string={"source": "last_message"}
)
writer_tool = ComponentTool(
component=writer_agent,
name="postmortem_writer_specialist",
description="Drafts a postmortem JSON using title/window/impact/rca/facts/mitigations.",
outputs_to_string={"source": "last_message"}
)
coordinator_agent = Agent(
chat_generator=llm,
tools=[
load_inputs,
detect_incident_window,
sql_investigate,
log_pattern_scan,
propose_mitigations,
profiler_tool,
writer_tool,
draft_postmortem
],
system_prompt=coordinator_prompt,
exit_conditions=["text"],
state_schema=state_schema
)












