10. Output Guardrails: The Last Line of Defense
Part 10 of the LangGraph Agent Security series
Every defensive control we’ve built so far operates on the input side of the execution pipeline. Validating what enters the system. Restricting what the LLM reads. Constraining tool access. Securing state. These are the defenses that intercept attacks before they do damage.
Output guardrails are different. They operate at the very end — examining what the agent is about to produce before it leaves the secure processing boundary. They’re the last thing that stands between the agent’s outputs and the world.
This matters because some things slip through upstream controls. A guard model that correctly classifies 95% of injections still misses 5%. A sophisticated indirect injection can survive content sanitization. A novel exfiltration technique might not match any existing pattern. Output guardrails catch what the upstream defenses missed.
But because they operate at the final stage, they’re also uniquely pressured. They must make accurate decisions at low latency on every output. Too aggressive and they block legitimate responses, erode user trust, and create operational friction. Too permissive and they don’t actually protect anything. The calibration problem is real.
This section covers what to scan for, how to build the detection pipeline, and how to tune it for production.
What the Output Layer Must Defend Against
Agent outputs aren’t homogeneous. The output guardrail architecture needs to cover all of them:
Natural language responses may contain sensitive data from the context — system prompt contents, API keys, user data from other sessions, proprietary business logic — either from a deliberate injection attack or simply because the LLM included context material it should have omitted.
Tool call arguments constructed by the LLM may contain injection payloads — SQL strings, shell commands, encoded exfiltration data, path traversal sequences.
Structured outputs written to databases may contain data from outside the authorized scope — other users’ records, fields the requesting user shouldn’t see, data modified by a goal hijacking attack.
Inter-agent messages may carry injection payloads from compromised retrieval sources, propagating to downstream agents with broader permissions.
Files and reports may contain sensitive information extracted from context, or content that constitutes a policy violation regardless of how it was produced.
A guardrail that only looks at natural language responses is incomplete. The exfiltration that matters most happens through tool calls, not chat responses.
Sensitive Data Detection and Redaction
The most universally applicable guardrail: scan every output for patterns that indicate credentials, personal information, or other sensitive content that should not leave the processing boundary.
Pattern-Based Detection
class SensitiveDataDetector:
PATTERNS = [
# ── Credentials ──────────────────────────────────────────────
(r'sk-[a-zA-Z0-9]{20,}',
SensitivityCategory.CREDENTIAL, "openai_api_key", "critical", True),
(r'(AKIA|ABIA|ACCA)[A-Z0-9]{16}',
SensitivityCategory.CREDENTIAL, "aws_access_key", "critical", True),
(r'(?i)(api[_\-\s]?key|secret[_\-\s]?key|access[_\-\s]?token)'
r'\s*[:=]\s*["\']?([a-zA-Z0-9\-_\.]{16,})["\']?',
SensitivityCategory.CREDENTIAL, "generic_api_key", "critical", True),
(r'(?i)(password|passwd|pwd)\s*[:=]\s*["\']?(\S{6,})["\']?',
SensitivityCategory.CREDENTIAL, "password", "critical", True),
(r'-----BEGIN (RSA |EC |OPENSSH )?PRIVATE KEY-----',
SensitivityCategory.CREDENTIAL, "private_key", "critical", True),
# ── PII ───────────────────────────────────────────────────────
(r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b',
SensitivityCategory.PII, "email_address", "medium", False),
(r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b',
SensitivityCategory.PII, "ssn", "critical", True),
# ── Financial ─────────────────────────────────────────────────
(r'\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|'
r'3[47][0-9]{13}|6(?:011|5[0-9]{2})[0-9]{12})\b',
SensitivityCategory.FINANCIAL, "credit_card", "critical", True),
# ── System Config ─────────────────────────────────────────────
(r'(?i)system\s+prompt\s*[:=\n]',
SensitivityCategory.SYSTEM_CONFIG, "system_prompt_label", "high", False),
]
def scan_and_redact(self, text, policy=None):
policy = policy or {}
detections = self.scan(text)
to_redact = []
to_flag = []
for detection in detections:
should_redact = detection.redact
if detection.category in policy:
should_redact = policy[detection.category].get('redact', should_redact)
(to_redact if should_redact else to_flag).append(detection)
processed_text, applied = self.redact(text, to_redact)
return processed_text, to_flag, applied
def redact(self, text, detections, redact_all=False):
if not detections:
return text, []
redacted = list(text)
applied = []
for detection in reversed(detections):
if detection.redact or redact_all:
placeholder = f"[REDACTED:{detection.pattern_name.upper()}]"
redacted[detection.match_start:detection.match_end] = list(placeholder)
applied.append(detection)
return ''.join(redacted), applied
Context-Aware Assessment
Pattern matching catches structured sensitive data. Context-aware assessment catches contextually sensitive content — a response that summarizes a confidential document verbatim, or reveals information outside the authorized scope, even though it contains no regex-detectable patterns:
CONTEXT_SENSITIVITY_PROMPT = """You are a data sensitivity reviewer for an AI agent system.
Review the agent's proposed output against the authorized scope of the current task.
Identify any content that:
1. Reveals information from outside the authorized scope
2. Contains data belonging to users other than the requesting user
3. Discloses system configuration, prompts, or internal logic
4. Could facilitate social engineering if sent to users
5. Reproduces confidential source material beyond what is needed
Authorized scope: {task_scope}
Requesting user ID: {user_id}
Requesting user role: {user_role}
Proposed output:
<output>{proposed_output}</output>
Respond with JSON only:
{
"is_appropriate": true | false,
"concerns": ["list of specific concerns if any"],
"severity": "none" | "low" | "medium" | "high" | "critical",
"recommended_action": "approve" | "redact_and_approve" | "block" | "escalate"
}"""
Context-aware assessment is expensive — it’s an additional LLM call. Apply it selectively: large responses, flagged sessions, or cases where other indicators have already fired.
Exfiltration Detection
Exfiltration detection identifies structural patterns characteristic of data exfiltration attempts, regardless of what specific data is being transmitted:
class ExfiltrationDetector:
def check_output(self, output_text, output_type, session_context):
indicators = []
# External URLs not present in the original query
external_urls = self._extract_external_urls(output_text)
original_urls = self._extract_external_urls(
session_context.get('original_query', '')
)
new_external_urls = external_urls - original_urls
if new_external_urls:
indicators.append({
"type": "unexpected_external_urls", "severity": "high",
"detail": f"Output contains {len(new_external_urls)} external URLs not in original request",
})
# Large Base64 blocks — possible encoded data exfiltration
suspicious_b64 = [
m for m in re.findall(r'(?:[A-Za-z0-9+/]{40,}={0,2})', output_text)
if len(m) > 100 and not m.startswith('data:')
]
if suspicious_b64:
indicators.append({
"type": "encoded_content", "severity": "medium",
"detail": f"Output contains {len(suspicious_b64)} encoded data block(s)",
})
# Webhook/collection endpoints — classic exfiltration destinations
webhook_patterns = [
r'https?://[^\s]+/webhook', r'https?://[^\s]+/collect',
r'https?://requestbin\.', r'https?://ngrok\.',
r'https?://webhook\.site', r'https?://pipedream\.net',
]
for pattern in webhook_patterns:
if re.search(pattern, output_text, re.IGNORECASE):
indicators.append({
"type": "webhook_reference", "severity": "critical",
"detail": "Output references webhook/collection endpoint",
})
break
# Output disproportionately large relative to query
query_length = len(session_context.get('original_query', ''))
if len(output_text) > max(query_length * 20, 5000):
indicators.append({
"type": "excessive_output_volume", "severity": "medium",
"detail": f"Output ({len(output_text)} chars) disproportionately large",
})
# For tool calls: check email recipients against authorized list
if output_type == "tool_arg":
output_emails = self._extract_emails(output_text)
authorized = set(session_context.get('authorized_email_recipients', []))
unauthorized = output_emails - authorized
if unauthorized:
indicators.append({
"type": "unauthorized_email_recipients", "severity": "critical",
"detail": f"Tool args contain {len(unauthorized)} unauthorized recipients",
})
return indicators
Screening Tool Call Arguments Before Execution
Tool call arguments deserve special attention — they’re how exfiltration actually happens:
class ToolCallExfiltrationScreener:
def screen_email_args(self, args, session_context):
concerns = []
recipient = args.get('to', '')
authorized = set(session_context.get('authorized_recipients', []))
if recipient and recipient not in authorized:
concerns.append(f"Recipient '{recipient}' not in authorized list")
body = args.get('body', '')
critical_detections = [d for d in sensitive_detector.scan(body)
if d.severity == "critical"]
if critical_detections:
concerns.append(f"Email body contains critical sensitive data: "
f"{[d.pattern_name for d in critical_detections]}")
# Base64 in email body
if re.findall(r'[A-Za-z0-9+/]{100,}={0,2}', body):
concerns.append("Email body contains potentially encoded content")
# Suspicious subject line
subject = args.get('subject', '')
if re.search(r'(data|export|dump|exfil|backup|copy)', subject, re.IGNORECASE):
concerns.append(f"Suspicious subject line: '{subject}'")
return len(concerns) == 0, concerns
Structured Output Enforcement
When agents produce structured outputs — JSON, typed reports, formatted data — enforcing the schema serves two security purposes: it ensures the agent is doing what it was designed to do, and it prevents a manipulated agent from embedding arbitrary content in a response that a downstream system will parse and act on.
class StructuredOutputEnforcer:
async def generate_structured(self, prompt, output_schema,
system_prompt, max_retries=2):
schema_json = json.dumps(output_schema.schema(), indent=2)
structured_system = (
f"{system_prompt}\n\n"
f"You MUST respond with a JSON object exactly matching this schema. "
f"No additional text, no markdown code blocks:\n{schema_json}"
)
for attempt in range(max_retries + 1):
response = await self.client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=structured_system,
messages=[{"role": "user", "content": prompt}]
)
raw_text = response.content[0].text.strip()
# Strip markdown code blocks if present
if raw_text.startswith("```"):
raw_text = re.sub(r'^```(?:json)?\n?', '', raw_text).rstrip('`').strip()
try:
validated = output_schema(**json.loads(raw_text))
return validated
except Exception as e:
if attempt < max_retries:
prompt += f"\n\n[Previous attempt failed: {e}. Ensure valid JSON matching schema.]"
else:
raise StructuredOutputError(
f"Failed after {max_retries + 1} attempts: {e}"
)
The Complete Response Filtering Pipeline
All the detection and enforcement logic runs as a unified pipeline that every output passes through before delivery:
class OutputGuardrailPipeline:
async def process(self, raw_output, output_type, session_context,
skip_stages=None) -> OutputGuardrailResult:
start = time.monotonic()
skip_stages = skip_stages or []
result = OutputGuardrailResult(
disposition=OutputDisposition.APPROVE,
processed_output=raw_output,
)
# Stage 1: Sensitive Data Detection
if "sensitive_data" not in skip_stages:
processed_text, flagged, redacted = self.sensitive_detector.scan_and_redact(
raw_output, policy=self.policy.get('redaction_policy', {})
)
result.sensitive_data_flagged = flagged
result.sensitive_data_redacted = redacted
if redacted:
result.processed_output = processed_text
result.disposition = OutputDisposition.REDACT_APPROVE
# Critical unredactable data → block immediately
critical_unredacted = [d for d in flagged
if d.severity == "critical" and not d.redact]
if critical_unredacted:
result.disposition = OutputDisposition.BLOCK
result.block_reason = f"Critical sensitive data: {[d.pattern_name for d in critical_unredacted]}"
result.processing_time_ms = (time.monotonic() - start) * 1000
return result
# Stage 2: Exfiltration Detection
if "exfiltration" not in skip_stages:
indicators = self.exfiltration_detector.check_output(
result.processed_output, output_type, session_context
)
result.exfiltration_indicators = indicators
critical_indicators = [i for i in indicators if i.get('severity') == 'critical']
if critical_indicators:
result.disposition = OutputDisposition.BLOCK
result.block_reason = f"Exfiltration indicators: {[i['type'] for i in critical_indicators]}"
result.processing_time_ms = (time.monotonic() - start) * 1000
return result
if indicators:
result.disposition = OutputDisposition.APPROVE_FLAGGED
# Stage 3: Context Sensitivity (expensive — selective)
if ("context_sensitivity" not in skip_stages
and output_type == "response"
and self.policy.get('enable_context_assessment', False)):
assessment = await assess_output_sensitivity(
result.processed_output,
session_context.get('task_scope', ''),
session_context.get('user_id', ''),
session_context.get('user_role', ''),
self.client
)
result.context_sensitivity = assessment
if assessment.get('recommended_action') == 'block':
result.disposition = OutputDisposition.BLOCK
result.block_reason = f"Context sensitivity: {assessment.get('concerns')}"
result.processing_time_ms = (time.monotonic() - start) * 1000
return result
result.processing_time_ms = (time.monotonic() - start) * 1000
self._log_result(result, session_context)
return result
Handling Blocked Outputs
When the pipeline blocks an output, the user message must be informative without revealing security details that would help an attacker understand what was caught:
class BlockedOutputHandler:
BLOCK_MESSAGES = {
"sensitive_data": (
"I wasn't able to include some information because it may contain "
"sensitive data. If you need the complete information, please contact "
"your administrator."
),
"exfiltration": (
"I encountered an issue completing this request as specified. "
"Please rephrase your request or contact support."
),
"context_sensitivity": (
"My response was limited because some information requested falls "
"outside the scope of what I can share in this context."
),
"policy": (
"This request cannot be completed as specified due to "
"organizational policy."
),
"default": (
"I wasn't able to complete this request. Please try rephrasing "
"or contact support if the issue persists."
),
}
Two things I’ve been deliberate about in these messages: they don’t tell the user what was blocked or why, and they don’t provide specific enough information to help an attacker understand how to reformulate. Security neutrality is the goal.
Every block should also be reported as a security incident — even if it’s a false positive, you want to track what’s getting blocked and why:
async def handle_blocked_output(self, guardrail_result, session_context,
incident_reporter) -> str:
await incident_reporter.report(
incident_type="output_blocked",
severity=self._get_incident_severity(guardrail_result),
details={
"disposition": guardrail_result.disposition,
"block_reason": guardrail_result.block_reason,
"sensitive_patterns": [d.pattern_name
for d in guardrail_result.sensitive_data_flagged],
"exfiltration_indicators": [i['type']
for i in guardrail_result.exfiltration_indicators],
"user_id": session_context.get('user_id'),
"session_id": session_context.get('session_id'),
}
)
return self.get_user_message(guardrail_result.block_reason)
Performance and Tuning
Output guardrails sit in the critical path of every response. Getting the performance and calibration right matters.
Latency considerations: Pattern-based detection (Stages 1-2) runs in microseconds to milliseconds. The context sensitivity LLM call (Stage 3) adds 200-800ms. Don’t run Stage 3 universally — apply it selectively to large responses, flagged sessions, or cases where other stages have already fired.
False positive management: A guardrail that blocks legitimate responses erodes user trust. The recommended approach before enabling enforcement:
- Run the pipeline in shadow mode — process all outputs through the guardrails, log the results, but don’t actually block anything.
- Measure the false positive rate on real traffic.
- Tune thresholds based on that data.
- Enable enforcement only after you have confidence in the calibration.
class GuardrailMetrics:
def record_pipeline_result(self, result, was_shadow_mode=False):
self.metrics.increment("guardrail.outcomes",
tags={"disposition": result.disposition,
"shadow_mode": was_shadow_mode})
self.metrics.histogram("guardrail.latency_ms", result.processing_time_ms)
if result.disposition == OutputDisposition.BLOCK:
self.metrics.increment("guardrail.blocks",
tags={"reason": self._categorize(result.block_reason)})
The metrics tell you: what’s the block rate? What’s the false positive rate? What’s the latency distribution? What categories of content are being caught? You can’t tune without measurement.
What This Section Completes
Output guardrails are the last layer, but they’re not the most important layer. They’re most valuable when upstream defenses have partially failed — catching what the input validator missed, what the tool security controls missed, what the state protection missed.
That’s the nature of defense in depth. No single layer is sufficient. Each layer catches some things that the others miss. The combination is substantially more robust than any individual control.
With input validation, tool security, state protection, multi-agent trust boundaries, and output guardrails in place, we have a reasonably complete picture of the defensive architecture. The remaining sections of this series cover the operational layer — authentication, monitoring, human-in-the-loop controls, rate limiting, and testing — which determine whether all these controls actually function correctly in production.
Output Guardrails Checklist
Detection coverage:
- Pattern-based detection covers credentials, PII, financial data, and system configuration
- Exfiltration patterns checked: external URLs, encoded content, webhook references, recipient anomalies
- Context-aware assessment applied to large or high-risk responses
- Tool call arguments screened for exfiltration before execution
Enforcement:
- Detected sensitive data is redacted, not just flagged
- Critical detections result in block
- Blocked outputs produce informative but security-neutral messages
- Escalation path defined for outputs requiring human review
Structured outputs:
- Schema-constrained generation used where supported
- Post-generation schema validation applied before consuming structured outputs
- Schema violations retried and then failed-secure if unresolvable
Operations:
- All guardrail decisions logged with disposition, stage, and detection details
- Blocks reported as security incidents and trigger alerts
- Guardrail run in shadow mode before enforcement to measure false positive rate
- Performance metrics tracked: latency, false positive rate, block rate per category
- Patterns updated when new sensitive data types are identified
This is Part 10 of an ongoing series on LangGraph agent security, and completes Part III (Defensive Architecture). Previous posts: Part 1: Introduction · Part 2: Architecture Primer · Part 3: Attack Surface Analysis · Part 4: Core Threat Categories · Part 5: Threat Modeling · Part 6: Input Validation · Part 7: Tool Security · Part 8: State and Memory Security · Part 9: Multi-Agent Trust Boundaries. Next: Part 11: Authentication and Authorization.