LangGraph Agent Security · Part 12 of 13

12. Observability and Monitoring: Detecting What Your Defenses Miss

12. Observability and Monitoring: Detecting What Your Defenses Miss

Part 12 of the LangGraph Agent Security series


Security controls prevent attacks. Observability detects them.

That distinction matters more for agents than for most systems I’ve worked with. No matter how carefully you implement input validation, output guardrails, tool security, and authentication, some attacks will succeed. An injection payload that evades the guard model. A novel exfiltration technique the output pipeline hasn’t seen. A slow privilege escalation unfolding across dozens of sessions over weeks. The question isn’t whether these happen — it’s whether you’ll detect them in time to contain them.

For conventional software, observability is mature infrastructure: structured logs, distributed traces, metrics dashboards, alerting pipelines. Applied to LangGraph agents, these tools are necessary but not sufficient. An agent that’s been manipulated into exfiltrating data through carefully chosen API call parameters — or executing unauthorized actions through legitimate tool calls — may produce logs that look completely normal to a system designed to detect application errors.

Security observability for agents requires purpose-built instrumentation that captures what agents decide and why, not just what they do.


The Four-Layer Observability Stack

Security observability for LangGraph agents requires four distinct layers, each addressing a different aspect of the threat model:

┌──────────────────────────────────────────────────────────┐
│ Layer 4: Security Intelligence                           │
│ Threat hunting, behavioral baselines, incident forensics │
├──────────────────────────────────────────────────────────┤
│ Layer 3: Alerting and Response                           │
│ Real-time anomaly detection, automated response triggers │
├──────────────────────────────────────────────────────────┤
│ Layer 2: Metrics and Aggregation                         │
│ Time-series data, dashboards, trend analysis             │
├──────────────────────────────────────────────────────────┤
│ Layer 1: Structured Logging                              │
│ Event-level records of every agent action and decision   │
└──────────────────────────────────────────────────────────┘

Each layer depends on the one below it. The correct construction order is bottom-up. I’ve learned this the hard way: jumping straight to alerting before establishing solid structured logging produces alerts that are either too noisy to act on or too coarse to be useful.


Layer 1: Structured Logging

Structured logging is the foundation. Unlike free-form text logs, structured logs emit machine-parseable records where every field has a defined name and type. Consistency is what makes them useful — a query that finds all tool calls with anomalous parameters only works if every tool call emits the same fields with the same names.

The Audit Event Schema

Every security-relevant event in a LangGraph agent should emit a record conforming to a consistent schema:

class EventType(str, Enum):
    SESSION_START    = "session.start"
    SESSION_END      = "session.end"
    LLM_INFERENCE_COMPLETE = "llm.inference.complete"
    TOOL_CALL_AUTHORIZED   = "tool.call.authorized"
    TOOL_CALL_DENIED       = "tool.call.denied"
    INPUT_VALIDATION_FAILED = "security.input.validation_failed"
    INJECTION_SUSPECTED     = "security.injection.suspected"
    OUTPUT_BLOCKED          = "security.output.blocked"
    SENSITIVE_DATA_DETECTED = "security.sensitive_data.detected"
    EXFILTRATION_DETECTED   = "security.exfiltration.detected"
    AUTH_FAILED             = "security.auth.failed"
    PERMISSION_DENIED       = "security.permission.denied"
    STATE_ANOMALY           = "security.state.anomaly"

@dataclass
class AgentAuditEvent:
    event_id:     str
    event_type:   EventType
    timestamp:    str
    agent_id:     str
    agent_version: str
    session_id:   Optional[str]  # Hashed for privacy
    thread_id:    Optional[str]  # Hashed for privacy
    user_id:      Optional[str]  # Hashed for privacy
    user_role:    Optional[str]
    node_name:    Optional[str]
    step_count:   Optional[int]
    event_data:   dict
    severity:     str            # info, warning, error, critical
    security_relevant: bool      # Routes to SIEM when True
    duration_ms:  Optional[float]

Logging Tool Calls: The Right Way

Tool call logging is one of the places where subtle decisions matter most. The logs need to be useful for security analysis, but they absolutely cannot contain raw argument values — those may include credentials, PII, or the injection payload itself.

def log_tool_call(self, session_id, user_id, tool_name,
                   argument_keys, argument_hash, authorized, ...):
    self.log_event(AgentAuditEvent.create(
        event_type=(EventType.TOOL_CALL_AUTHORIZED
                    if authorized else EventType.TOOL_CALL_DENIED),
        event_data={
            "tool_name": tool_name,
            "argument_keys": argument_keys,    # Keys only — what was passed
            "argument_hash": argument_hash,    # Hash for correlation
            "denial_reason": denial_reason,
        },
        ...
    ))

Log the argument keys (which tell you what parameters were passed) and a hash of the argument values (which lets you correlate events without exposing content). Never log the argument values themselves.

The same principle applies to LLM inference:

def log_llm_inference(self, session_id, user_id, model,
                       input_tokens, output_tokens, tool_calls_made, ...):
    self.log_event(AgentAuditEvent.create(
        event_data={
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "tool_calls_count": len(tool_calls_made),
            "tool_calls_names": tool_calls_made,
            # NEVER log actual LLM input/output content
            # — too large and may contain sensitive data
        },
        ...
    ))

What Must Never Be Logged

This list is as important as what to log:

  • LLM input/output content — may contain injected secrets, user PII, confidential retrieved documents
  • Tool argument values — may contain credentials, PII, sensitive parameters
  • System prompt contents — confidential business logic
  • User message content — may contain sensitive information
  • Retrieved document content — may contain confidential data
  • OAuth tokens or any credential values
  • Full stack traces from production — may leak internal structure

Log metadata about these things (hashes, lengths, categories) but not the things themselves.


Layer 2: Metrics and Aggregation

Metrics aggregate log events into quantitative time-series data. Where logs tell you what happened, metrics tell you whether what happened is normal.

The core security metrics to track:

class AgentMetricsCollector:
    # Session lifecycle: starts, ends, durations, outcomes
    def record_session_end(self, duration_seconds, step_count,
                            total_tokens, outcome, user_role): ...

    # LLM inference: token counts, latency, tool call rates
    def record_llm_inference(self, model, input_tokens, output_tokens,
                              duration_ms, tool_calls_count): ...

    # Tool calls: by tool name, success/failure, denial count
    def record_tool_call(self, tool_name, authorized,
                          duration_ms, error=None): ...

    # Security events: injection detections, output blocks, auth failures
    def record_injection_detection(self, source, risk_level, action): ...
    def record_output_guardrail(self, disposition, detection_types, ...): ...
    def record_auth_event(self, event_type, user_role, reason): ...

    # Cost tracking: token consumption per session, user, role
    def record_cost(self, tokens_used, model, user_role, ...): ...

A security dashboard built on these metrics gives you at-a-glance visibility into what’s normal and what isn’t. The panels I’ve found most useful: active sessions, security events, cost anomalies, injection detection rate by source, tool call patterns, output guardrail outcomes, and top anomalies.


Layer 3: Anomaly Detection and Alerting

Raw metrics become security signals through anomaly detection — identifying patterns that deviate from established baselines in ways that suggest malicious activity.

Statistical Baseline Monitoring

Statistical anomaly detection using z-scores works well for individual metrics:

class StatisticalAnomalyDetector:
    def __init__(self, window_size=100, alert_threshold_sigma=3.0, min_samples=20):
        self.threshold_sigma = alert_threshold_sigma
        self._windows: dict[str, deque] = defaultdict(
            lambda: deque(maxlen=window_size)
        )

    def record_observation(self, metric_name, value, dimensions=None):
        key = self._make_key(metric_name, dimensions)
        window = self._windows[key]

        anomaly = None
        if len(window) >= self.min_samples:
            values = [v for v, _ in window]
            mean = statistics.mean(values)
            std_dev = statistics.stdev(values)
            if std_dev > 0:
                z_score = abs((value - mean) / std_dev)
                if z_score > self.threshold_sigma:
                    anomaly = {
                        "metric": metric_name,
                        "value": value,
                        "mean": round(mean, 2),
                        "z_score": round(z_score, 2),
                        "severity": "critical" if z_score > 6.0 else
                                   "high" if z_score > 4.5 else "medium",
                    }

        window.append((value, time.time()))
        return anomaly

Multi-Signal Behavioral Detection

Beyond single-metric anomalies, behavioral patterns that span multiple metrics often indicate actual attacks better than any individual signal:

class BehavioralAnomalyDetector:
    def analyze_session(self, session_id, current_state):
        anomalies = []

        # Pattern 1: Rapid tool denial escalation
        # Multiple tool denials in short succession suggest capability probing
        recent_denials = [
            t for t in profile.get("denied_tools", [])
            if time.time() - t.get("timestamp", 0) < 300
        ]
        if len(recent_denials) >= 5:
            anomalies.append({
                "pattern": "rapid_tool_denial_escalation",
                "severity": "high",
                "detail": f"{len(recent_denials)} tool denials in 5 min. Possible capability probing.",
            })

        # Pattern 2: Injection flag + exfiltration tool = likely exfiltration attempt
        if (profile.get("injection_flags", 0) > 0
                and any(t in profile.get("recent_tools", [])
                        for t in ["send_email", "fetch_url", "upload_file"])):
            anomalies.append({
                "pattern": "injection_then_exfiltration_tool",
                "severity": "critical",
                "detail": "Injection flagged earlier, followed by data-transmitting tool use.",
            })

        # Pattern 3: Token consumption spike per step
        tokens_per_step = (current_state.get("tokens_used", 0) /
                          max(current_state.get("step_count", 1), 1))
        if tokens_per_step > 5000:
            anomalies.append({
                "pattern": "token_consumption_spike",
                "severity": "medium",
                "detail": f"High tokens/step: {tokens_per_step:.0f}. Possible context flooding.",
            })

        # Pattern 4: Session running far longer than baseline
        if current_state.get("step_count", 0) > baseline_steps * 3:
            anomalies.append({
                "pattern": "excessive_step_count",
                "severity": "high",
                "detail": "Session far exceeding baseline step count. Possible loop induction.",
            })

        # Pattern 5: Novel external domain contact
        novel_domains = set(current_state.get("contacted_domains", [])) - known_domains
        if novel_domains:
            anomalies.append({
                "pattern": "novel_external_domain_contact",
                "severity": "medium",
                "detail": f"Session contacted unknown domain(s): {list(novel_domains)}",
            })

        # Pattern 6: Bulk user data access across many distinct user IDs
        if len(set(current_state.get("accessed_user_ids", []))) > 10:
            anomalies.append({
                "pattern": "bulk_user_data_access",
                "severity": "high",
                "detail": "Session accessed data for many distinct user IDs. Possible bulk exfiltration.",
            })

        return anomalies

The injection-then-exfiltration pattern is the one I find most valuable in practice. An injection detection on its own is noise — maybe the user phrased something awkwardly. But injection detection followed in the same session by an email or HTTP tool call is a strong signal of an exfiltration attempt.

Alert Definitions

Alerts need careful calibration. Too few and incidents go undetected. Too many and alert fatigue sets in — reviewers start approving without reading, which is worse than no review.

My alert structure: critical alerts that require immediate response and trigger automatic session termination; high-severity alerts that require response within 15 minutes; medium alerts that require response within an hour.

The most important critical alerts:

  • Exfiltration attempt detected — output guardrail blocked a likely exfiltration → auto-terminate session and flag user
  • Cross-tenant access attempt — agent tried to access another tenant’s data → always alert, no cooldown
  • Credential detected in output — agent output contained what looks like a credential → block output and trigger credential rotation
  • Injection followed by exfiltration tool — the pattern described above → require human approval for all subsequent tool calls

Layer 4: Automated Response

For the highest-confidence, highest-impact threats, automated response beats waiting for human intervention:

class AutomatedSecurityResponder:
    async def execute(self, response_type, session_id, context):
        response_handlers = {
            "terminate_session": self._terminate_session,
            "terminate_session_and_flag_user": self._terminate_and_flag,
            "require_human_approval_for_tool": self._require_approval,
            "insert_human_checkpoint": self._insert_checkpoint,
            "block_output_and_rotate_credentials": self._block_and_rotate,
        }
        handler = response_handlers.get(response_type)
        if handler:
            logger.warning("automated_security_response_executing",
                          response_type=response_type, session_id=session_id)
            await handler(session_id, context)

The key constraint: automated responses must be carefully scoped. A false-positive auto-termination is disruptive. Reserve fully automated hard responses (session termination, blocking) for the highest-confidence signals. Use softer responses (requiring approval for subsequent tool calls, inserting a checkpoint) for medium-confidence signals.


SIEM Integration

Enterprise security teams operate SIEM platforms — Splunk, Elastic Security, Microsoft Sentinel — that aggregate security events from across the entire technology stack. Integrating agent logs means agent security events become visible alongside network, authentication, and application events, enabling cross-system attack detection.

CEF (Common Event Format) is the standard format accepted by most SIEM platforms:

def format_as_cef(self, event: AgentAuditEvent) -> str:
    severity_map = {"info": 3, "warning": 6, "error": 8, "critical": 10}
    cef_severity = severity_map.get(event.severity, 3)

    extensions = {
        "rt": int(datetime.fromisoformat(event.timestamp).timestamp() * 1000),
        "deviceExternalId": event.agent_id,
        "duid": event.user_id or "unknown",
        "cs1": event.session_id or "unknown",
        "cs1Label": "sessionId",
    }

    ext_str = " ".join(
        f"{k}={str(v).replace('=', '\\=').replace('|', '\\|')}"
        for k, v in extensions.items()
    )

    return (
        f"CEF:0|Company|LangGraphAgent|{event.agent_version}|"
        f"{event.event_type}|{event.event_type}|{cef_severity}|"
        f"{ext_str}"
    )

SIEM forwarding should be best-effort with a short timeout — don’t let a SIEM outage block agent operations.


What Changes When You Build This

When I first added proper observability to my agents, two things became immediately apparent.

The first was how much I hadn’t been able to see. Injection attempts I hadn’t known about. Tool denial spikes that looked like bugs but were probably probing attempts. Sessions running far longer than they should have. All invisible without the right instrumentation.

The second was how different security observability is from operational observability. Operational monitoring asks: is the system healthy? Is it fast? Are errors acceptable? Security monitoring asks: is the system doing what its users intended? Are there patterns that look like manipulation? Are anomalies concentrated in ways that suggest specific users are under attack?

These are different questions, and they require different instruments. Building operational observability and calling it security observability is one of the more common gaps I see in production agent deployments.


Observability and Monitoring Checklist

Logging:

  • All security-relevant events emit structured logs with consistent schema
  • Logs contain hashed (not plaintext) user IDs and session IDs
  • LLM content, tool argument values, and credentials never logged
  • Log writes validated against sensitive data patterns before emission
  • Security logs in tamper-evident, append-only store
  • Retention policy covers minimum required for forensic analysis

Metrics:

  • Session lifecycle metrics collected (start, end, duration, outcome)
  • LLM inference metrics: token counts, latency, tool call rates
  • Tool call metrics: by tool name, success/failure, denial count
  • Security event metrics: injection detection, output blocks, auth failures
  • Cost metrics: token consumption per session, user, role

Anomaly detection:

  • Statistical baselines established for all key metrics
  • Per-session behavioral profiling detects multi-signal patterns
  • Coverage includes: injection rates, step count, token consumption, tool denials, novel domains, bulk data access

Alerting:

  • Alert definitions exist for all critical and high-severity events
  • Cooldown periods prevent alert fatigue
  • Automated responses defined for highest-confidence threats
  • Alert channels tested regularly

Integration:

  • Security events forwarded to enterprise SIEM in standard format
  • LangSmith tracing enabled for all production executions
  • On-call rotation defined for alerts requiring human response

This is Part 12 of an ongoing series on LangGraph agent security. Previous posts: Part 1: Introduction · Part 2: Architecture Primer · Part 3: Attack Surface Analysis · Part 4: Core Threat Categories · Part 5: Threat Modeling · Part 6: Input Validation · Part 7: Tool Security · Part 8: State and Memory Security · Part 9: Multi-Agent Trust Boundaries · Part 10: Output Guardrails · Part 11: Authentication and Authorization. Next: Part 13: Human-in-the-Loop.