Background Jobs¶
This page documents the background jobs provided by the AI Ops App.
Overview¶
The AI Ops App includes Nautobot Jobs for automated maintenance tasks. Jobs can be run manually or scheduled for automatic execution.
Cleanup Checkpoints Job¶
ai_ops.jobs.checkpoint_cleanup.CleanupCheckpointsJob
¶
The Cleanup Checkpoints Job removes old conversation history from Redis to prevent unbounded growth.
Purpose¶
Conversation checkpoints are stored in Redis for maintaining chat history. Over time, these checkpoints accumulate and consume Redis memory. This job periodically cleans up old checkpoints based on a retention policy.
Job Details¶
- Name: Cleanup Old Checkpoints
- Group: AI Agents
- Description: Clean up old LangGraph conversation checkpoints from Redis based on retention policy
- Scheduling: Can be scheduled for automatic execution
- Sensitive Variables: None
How It Works¶
class CleanupCheckpointsJob(Job):
"""Job to clean up old conversation checkpoints from Redis."""
def run(self):
"""Entry point for the job."""
# Execute cleanup task
result = cleanup_old_checkpoints()
if result.get("success"):
self.logger.info(
f"✅ Checkpoint cleanup completed: "
f"processed {result['processed_count']} keys "
f"(retention: {result['retention_days']} days)"
)
else:
self.logger.error(f"❌ Checkpoint cleanup failed: {result.get('error')}")
raise Exception(f"Cleanup failed: {result.get('error')}")
return result
Cleanup Task¶
The underlying cleanup task is defined in ai_ops/celery_tasks.py:
def cleanup_old_checkpoints(retention_days: int = 30) -> dict:
"""Clean up old LangGraph checkpoints from Redis.
Args:
retention_days: Number of days to retain checkpoints
Returns:
Dictionary with cleanup results:
{
"success": bool,
"processed_count": int,
"deleted_count": int,
"retention_days": int,
"error": str (if failed)
}
"""
Retention Policy¶
Default Retention: 30 days
Checkpoints older than the retention period are removed. The retention period is calculated from the checkpoint's timestamp.
Configurable: The retention period can be adjusted by modifying the cleanup_old_checkpoints() function call.
Running the Job¶
Manual Execution¶
- Navigate to Jobs > Jobs in Nautobot
- Find AI Agents > Cleanup Old Checkpoints
- Click Run Job Now
- Review the job log for results
Scheduled Execution¶
- Navigate to Jobs > Jobs
- Find AI Agents > Cleanup Old Checkpoints
- Click Schedule Job
- Configure schedule:
- Name: Descriptive name for the schedule
- Interval: How often to run (e.g., daily, weekly)
- Start Time: When to start running
- Enabled: Check to activate the schedule
Recommended Schedule: Daily or weekly, depending on usage volume.
Job Output¶
The job returns a dictionary with cleanup statistics:
{
"success": True,
"processed_count": 150, # Total keys scanned
"deleted_count": 45, # Keys deleted
"retention_days": 30, # Retention period used
"error": None # Error message if failed
}
Example Job Log¶
2024-12-05 10:30:00 INFO Starting checkpoint cleanup task...
2024-12-05 10:30:05 INFO ✅ Checkpoint cleanup completed: processed 150 keys (retention: 30 days)
2024-12-05 10:30:05 INFO Deleted 45 old checkpoint keys
2024-12-05 10:30:05 SUCCESS Job completed successfully
MCP Server Health Check Job¶
ai_ops.jobs.mcp_health_check.MCPServerHealthCheckJob
¶
Bases: Job
Job to perform automated health checks on MCP servers.
This job checks all HTTP MCP servers (excluding those with Vulnerable status) and updates their health status based on HTTP health check results.
Features: - Parallel execution using ThreadPoolExecutor (1 worker per server, max 4 workers) - Retry logic: 2 verification checks (5s apart) before status change - Cache invalidation: Clears MCP client cache if any status changes - Skips servers with "Vulnerable" status - Skips servers with "stdio" protocol (only checks HTTP servers)
Status change logic: - Healthy server + successful check = no change - Unhealthy server + failed check = no change - Status differs = perform 2 verification checks, then flip if confirmed
The MCP Server Health Check Job performs automated health monitoring of HTTP-based MCP servers to ensure they're operational.
Purpose¶
MCP servers are critical for providing tools and capabilities to AI agents. This job automatically:
- Checks all HTTP MCP servers for availability
- Updates server status based on health check results
- Implements retry logic to avoid false positives
- Invalidates agent cache when server status changes
- Runs in parallel for efficient checking
Job Details¶
- Name: MCP Server Health Check
- Group: AI Agents
- Description: Perform automated health checks on HTTP MCP servers with retry logic and parallel execution
- Scheduling: Can be scheduled for automatic execution
- Hidden: Yes (typically triggered by scheduler, not manually run)
- Sensitive Variables: None
Key Features¶
- Parallel Execution: Uses ThreadPoolExecutor (1 worker per server, max 4 workers)
- Retry Logic: 2 verification checks (5 seconds apart) before status change
- Cache Invalidation: Clears MCP client cache if any status changes
- Protocol Filtering: Only checks HTTP servers, skips STDIO protocol
- Status Filtering: Skips servers with "Vulnerable" status
Health Check Process¶
# For each HTTP MCP server:
1. Send GET request to {server.url}{server.health_check}
2. If response differs from current status:
a. Wait 5 seconds
b. Perform verification check
c. Wait 5 seconds
d. Perform second verification check
e. If both verifications confirm: update status
3. If any status changed: clear agent MCP cache
Status Change Logic¶
- Healthy server + successful check = No change
- Unhealthy server + failed check = No change
- Status differs = Perform 2 verification checks, then flip if confirmed
Usage Example¶
Manual Execution¶
- Navigate to Jobs > Jobs in Nautobot
- Find AI Agents > MCP Server Health Check
- Click Run Job Now
- Review job log for health check results
Scheduled Execution¶
Recommended schedule: Every 5-15 minutes
# Configure via Nautobot UI or programmatically
from nautobot.extras.models import ScheduledJob
ScheduledJob.objects.create(
name="MCP Health Monitoring",
job_model="ai_ops.jobs.mcp_health_check.MCPServerHealthCheckJob",
interval="crontab",
crontab="*/10 * * * *", # Every 10 minutes
enabled=True
)
Job Output¶
{
"success": True,
"checked_count": 5, # Number of servers checked
"changed_count": 1, # Number of status changes
"failed_count": 1, # Number of servers that failed
"worker_count": 4, # Number of parallel workers used
"cache_cleared": True, # Whether agent cache was invalidated
"error": None # Error message if failed
}
Example Job Log¶
2024-12-18 10:00:00 INFO Starting MCP server health checks...
2024-12-18 10:00:02 INFO ✅ MCP health check completed: 5 server(s) checked using 4 worker(s), 1 status change(s), 1 failure(s)
2024-12-18 10:00:02 INFO ✅ MCP client cache cleared due to status changes
2024-12-18 10:00:02 WARNING ⚠️ 1 server(s) changed status - check logs for details
2024-12-18 10:00:02 SUCCESS Job completed successfully
Performance Considerations¶
- Parallel Workers: Max 4 workers to balance speed vs resource usage
- Timeout: Each health check times out after 10 seconds
- Verification Delay: 5 seconds between verification checks
- Total Time: Typically completes in 10-30 seconds for 5-10 servers
Checkpoint Storage¶
Redis Key Structure¶
Checkpoints are stored in Redis with a specific key pattern:
Example keys:
checkpoint:user-session-abc123:2024-12-05T10:30:00
checkpoint:user-session-def456:2024-12-05T11:45:00
Checkpoint Content¶
Each checkpoint stores: - Messages: Conversation history - Metadata: Timestamp, user info, etc. - Agent State: Current state of the agent
Redis Database¶
Checkpoints use a separate Redis database:
- Default Database: DB 2
- Configurable via: LANGGRAPH_REDIS_DB environment variable
- Isolation: Separate from cache (DB 0) and Celery (DB 1)
Cleanup Process¶
Step-by-Step Process¶
-
Connect to Redis
-
Scan for Checkpoint Keys
-
Check Timestamp
-
Delete Old Checkpoints
-
Return Results
Performance Considerations¶
- Scan vs Keys: Uses
SCANto avoid blocking Redis - Batch Processing: Processes keys in batches
- Memory Efficient: Doesn't load all keys into memory
- Non-Blocking: Allows Redis to serve other requests
Monitoring¶
Job Execution Status¶
Monitor job execution through Nautobot:
- Navigate to Jobs > Job Results
- Filter by job name: "Cleanup Old Checkpoints"
- Review execution history:
- Success/failure status
- Execution duration
- Number of keys processed
- Error messages if any
Redis Monitoring¶
Monitor Redis usage:
# Connect to Redis
redis-cli -h localhost -p 6379 -n 2
# Count checkpoint keys
SCAN 0 MATCH checkpoint:* COUNT 1000
# Check memory usage
INFO memory
# Get database statistics
INFO keyspace
Metrics to Track¶
- Checkpoint Count: Total number of checkpoints
- Redis Memory: Memory used by checkpoint database
- Cleanup Frequency: How often cleanup runs
- Deletion Rate: Number of checkpoints deleted per run
Troubleshooting¶
Job Fails to Execute¶
Check Redis Connectivity:
from ai_ops.checkpointer import get_redis_connection
try:
redis_client = get_redis_connection()
redis_client.ping()
print("Redis connection OK")
except Exception as e:
print(f"Redis connection failed: {e}")
Verify Environment Variables:
Check Redis Permissions: - Ensure Redis password is correct - Verify network connectivity - Check firewall rules
No Checkpoints Deleted¶
Possible Causes: - All checkpoints are within retention period - Checkpoint keys have different pattern - Wrong Redis database selected
Verify Checkpoint Keys:
Job Takes Too Long¶
For Large Datasets: - Increase job timeout - Run during off-peak hours - Consider reducing retention period - Optimize Redis performance
Memory Not Freed¶
After cleanup, Redis memory may not immediately decrease:
-
Check Deleted Keys:
Look forevicted_keysor deleted count -
Redis Memory Reclaim: Redis may not immediately release memory to OS
- Memory reused for new keys
-
Run
MEMORY PURGE(Redis 4.0+) -
Verify Cleanup Results: Check job log for deleted count
Best Practices¶
Scheduling¶
- Regular Execution: Schedule to run at least weekly
- Off-Peak Hours: Run during low-traffic periods
- Monitor First Runs: Check initial executions carefully
- Adjust Frequency: Based on checkpoint creation rate
Retention Policy¶
- Balance History vs Space: Longer retention = more history, more space
- Consider Use Patterns: How long do users need history?
- Compliance Requirements: Legal/regulatory retention needs
- Storage Capacity: Redis memory limitations
Monitoring¶
- Set Up Alerts: Alert on job failures
- Track Metrics: Monitor key count and memory
- Regular Reviews: Periodically review cleanup effectiveness
- Log Analysis: Review logs for patterns
Disaster Recovery¶
- Redis Backup: Regular Redis backups include checkpoints
- Retention Coordination: Align with backup schedule
- Test Restoration: Verify checkpoint data in backups
- Document Procedure: Clear recovery process
Advanced Configuration¶
Custom Retention Period¶
Modify the retention period by editing the job:
# ai_ops/jobs/checkpoint_cleanup.py
def run(self):
# Custom retention: 60 days instead of 30
result = cleanup_old_checkpoints(retention_days=60)
# ... rest of the code
Conditional Cleanup¶
Implement conditional cleanup based on memory usage:
def run(self):
redis_client = get_redis_connection()
memory_info = redis_client.info('memory')
used_memory_mb = memory_info['used_memory'] / (1024 * 1024)
if used_memory_mb > 1000: # Over 1GB
# Aggressive cleanup
result = cleanup_old_checkpoints(retention_days=7)
else:
# Normal cleanup
result = cleanup_old_checkpoints(retention_days=30)
return result
Selective Cleanup¶
Clean up specific thread patterns:
def cleanup_user_checkpoints(user_id: str):
"""Clean up checkpoints for a specific user."""
redis_client = get_redis_connection()
pattern = f"checkpoint:user-{user_id}-*"
deleted = 0
for key in redis_client.scan_iter(match=pattern):
redis_client.delete(key)
deleted += 1
return deleted
Related Documentation¶
- Checkpointer Configuration - Redis checkpoint setup (see
ai_ops/checkpointer.py) - Celery Tasks - Background task definitions (see
ai_ops/celery_tasks.py) - Agents - How agents use checkpoints
- External Interactions - Redis configuration