Database Guard
Query validation for AI agents to prevent data exfiltration.
Database Guard protects databases from AI agent abuse by validating SQL queries before execution. It addresses OWASP ASI03 (Identity and Privilege Abuse) and prevents data exfiltration attacks.
The Problem
AI agents with database access are vulnerable to:
User asks: "What's the weather like?"
Agent (compromised via prompt injection):
→ SELECT * FROM users
→ SELECT password, ssn, credit_card FROM customers
→ DROP TABLE audit_log
Impact:
- 23% of organizations have experienced AI agent data leaks
- Financial data, PII, and credentials can be exfiltrated
- Destructive operations can corrupt or destroy data
The Solution
Database Guard validates every query before execution:
Query Validation
──────────────────────────────────────────────────────────────
SELECT name FROM users WHERE id=1 → ALLOWED
SELECT * FROM users → BLOCKED (SELECT *)
SELECT password FROM users → BLOCKED (sensitive)
DELETE FROM users → BLOCKED (no WHERE)
1; DROP TABLE users-- → BLOCKED (injection)
Quick Start
from sentinelseed.database import DatabaseGuard
# Create guard with policy
guard = DatabaseGuard(
max_rows_per_query=1000,
require_where_clause=True,
)
# Validate before executing
result = guard.validate("SELECT name, email FROM users WHERE active = true")
if result.allowed:
cursor.execute(query)
else:
log.warning(f"Query blocked: {result.reason}")
for v in result.violations:
print(f" - {v.description}")
Preset Policies
from sentinelseed.database import (
DatabaseGuard,
POLICY_STRICT,
POLICY_MODERATE,
POLICY_PERMISSIVE,
)
# Strict: Block everything risky
guard = DatabaseGuard(policy=POLICY_STRICT)
# Moderate: Reasonable defaults (default)
guard = DatabaseGuard(policy=POLICY_MODERATE)
# Permissive: Minimal blocking
guard = DatabaseGuard(policy=POLICY_PERMISSIVE)
Policy Comparison
| Feature | STRICT | MODERATE | PERMISSIVE |
|---|---|---|---|
| Max rows/query | 100 | 1,000 | 10,000 |
| Block SELECT * | Yes | Yes | No |
| Block UNION | Yes | Yes | Yes |
| Block DROP/TRUNCATE | Yes | Yes | No |
| Block schema changes | Yes | Yes | No |
| Block sensitive data | Yes | No | No |
| Require WHERE | Yes | Yes | Yes |
Detection Patterns
SQL Injection (12 patterns)
| Pattern | Risk | Description |
|---|---|---|
| UNION SELECT | Critical | Classic injection |
| OR 1=1 | Critical | Tautology attack |
| --/# comments | High | Query termination |
| SLEEP/BENCHMARK | Critical | Time-based injection |
| INTO OUTFILE | Critical | File write |
| LOAD_FILE | Critical | File read |
| Stacked queries | Critical | Multiple statements |
| INFORMATION_SCHEMA | High | DB enumeration |
Sensitive Data (14 patterns)
| Category | Columns Detected |
|---|---|
| Authentication | password, token, api_key, private_key |
| Financial | credit_card, cvv, bank_account, iban |
| Legal | ssn, passport, drivers_license |
| PII | dob, address, phone, email |
| Health (HIPAA) | medical_record, diagnosis, patient_id |
Validation Results
result = guard.validate(query)
if result.blocked:
print(f"Blocked: {result.reason}")
if result.has_sensitive_data:
print("Warning: Accessing sensitive columns")
if result.is_destructive:
print("Warning: Destructive operation")
# Detailed information
print(f"Query type: {result.query_type.value}")
print(f"Risk level: {result.risk_level.value}")
print(f"Tables: {result.tables_accessed}")
Strict Mode
from sentinelseed.database import DatabaseGuard, QueryBlocked
guard = DatabaseGuard(strict_mode=True)
try:
result = guard.validate("SELECT * FROM users")
except QueryBlocked as e:
print(f"Blocked: {e}")
for v in e.violations:
print(f" - {v.description}")
Framework Integration
SQLAlchemy
from sqlalchemy import event
from sentinelseed.database import DatabaseGuard, QueryBlocked
guard = DatabaseGuard(strict_mode=True)
@event.listens_for(engine, "before_cursor_execute")
def validate_query(conn, cursor, statement, parameters, context, executemany):
try:
guard.validate(statement)
except QueryBlocked as e:
raise SecurityError(f"Query blocked: {e}")
LangChain SQL Agent
from langchain.agents import create_sql_agent
from sentinelseed.database import DatabaseGuard
guard = DatabaseGuard(
allowed_tables={"products", "categories"},
max_rows_per_query=100,
)
class SafeSQLDatabase(SQLDatabase):
def run(self, command: str, fetch: str = "all"):
result = guard.validate(command)
if result.blocked:
return f"Query blocked: {result.reason}"
return super().run(command, fetch)
OWASP Coverage
| OWASP ID | Vulnerability | Coverage |
|---|---|---|
| ASI03 | Identity and Privilege Abuse | Partial |
| ASI01 | Agent Goal Hijack (via SQL) | Partial |
Best Practices
1. Use table whitelists in production
guard = DatabaseGuard(allowed_tables={"users", "products", "orders"})
2. Set appropriate row limits
guard = DatabaseGuard(max_rows_per_query=100)
3. Monitor blocked queries
if stats["block_rate"] > 0.1:
alert_security_team()
4. Combine with database permissions - Defense in depth