class AiOpsConfig(NautobotAppConfig):
"""App configuration for the ai_ops app."""
name = "ai_ops"
verbose_name = "AI Ops"
version = __version__
author = "Kevin Campos"
description = "AI Ops integration for Nautobot."
base_url = "ai-ops"
required_settings = []
default_settings = {}
constance_config = {
"chat_session_ttl_minutes": ConstanceConfigItem(
default=10,
help_text="Time-to-live (TTL) for chat sessions in minutes. Chat sessions automatically expire after this period of inactivity or message age. Applies to both frontend (localStorage) and backend (MemorySaver) cleanup. Valid range: 1-1440 minutes (1 minute to 24 hours).",
field_type=int,
),
"checkpoint_retention_days": ConstanceConfigItem(
default=7,
help_text="Retention period in days for conversation checkpoints. Used by cleanup jobs when migrated to Redis Stack or PostgreSQL persistent storage. Not enforced for current MemorySaver implementation which uses chat_session_ttl_minutes instead. Valid range: 1-365 days.",
field_type=int,
),
"agent_request_timeout_seconds": ConstanceConfigItem(
default=120,
help_text="Maximum time in seconds for agent request processing. If the agent takes longer than this to respond, the request will be cancelled and a timeout error returned. Valid range: 10-600 seconds (10 seconds to 10 minutes).",
field_type=int,
),
"agent_recursion_limit": ConstanceConfigItem(
default=25,
help_text="Maximum recursion depth for agent graph traversal. Limits the number of steps the agent can take in a single request to prevent infinite loops. Valid range: 5-100.",
field_type=int,
),
}
docs_view_name = "plugins:ai_ops:docs"
searchable_models = ["llmmodel", "mcpserver"]
def ready(self):
"""Connect signal handlers when the app is ready."""
import logging
from .helpers.async_shutdown import register_shutdown_handlers
from .helpers.logging_config import setup_ai_ops_logging
logger = logging.getLogger(__name__)
# Setup structured JSON logging for ai_ops.* loggers
setup_ai_ops_logging()
# Register graceful shutdown handlers for async resources (MCP clients, checkpointers)
# Handles both development (auto-reloader) and production (SIGTERM/SIGINT) scenarios
register_shutdown_handlers()
# NOTE: All default data and scheduled job creation is handled by data migrations
# (0006_populate_default_data, 0008_default_scheduled_jobs) so no signals are needed.
# Note: Periodic tasks are handled via Nautobot Jobs (ai_agents.jobs).
# These jobs can be scheduled through the Nautobot UI for automatic execution.
# Warm the MCP client cache on startup using a background thread with
# its own event loop. AppConfig.ready() is called synchronously by
# Django — there is never a *running* loop here, so loop.create_task()
# always falls into the RuntimeError branch and the warmup silently
# never runs. A daemon thread runs the coroutine to completion without
# blocking the main thread or the Django startup sequence.
#
# NOTE: Redis/Postgres async connections (checkpointer, store) are NOT
# warmed up here. Those connections are bound to the event loop they
# were created in. A background thread's loop is always closed before
# any Django request loop starts, so the stored connections would be
# immediately detected as stale and recreated on the first request
# anyway — making the warmup work pointless. Those connections are
# initialised lazily on the first request instead.
#
# Guards applied before starting the thread:
# 1. NAUTOBOT_AI_OPS_SKIP_WARMUP=1 — explicit opt-out (useful in CI/test)
# 2. sys.argv[1] in _SKIP_WARMUP_COMMANDS — management commands that
# should not trigger network/DB calls (migrate, test, shell, …)
# 3. _WARMUP_STARTED event — process-level dedup so multiple ready()
# calls (auto-reloader child, duplicate AppConfig) only fire once.
_management_cmd = len(sys.argv) > 1 and sys.argv[1] in _SKIP_WARMUP_COMMANDS
_env_skip = os.environ.get("NAUTOBOT_AI_OPS_SKIP_WARMUP", "").lower() in {"1", "true", "yes"}
if _env_skip:
logger.debug("Skipping MCP warmup: NAUTOBOT_AI_OPS_SKIP_WARMUP is set")
elif _management_cmd:
logger.debug("Skipping MCP warmup: running under management command '%s'", sys.argv[1])
elif _WARMUP_STARTED.is_set():
logger.debug("Skipping MCP warmup: already started in this process")
else:
# Atomically claim the warmup slot before spawning the thread.
_WARMUP_STARTED.set()
try:
import asyncio
from ai_ops.agents.multi_mcp_agent import warm_mcp_cache
def _run_startup_warmup() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(warm_mcp_cache())
finally:
loop.close()
warmup_thread = threading.Thread(
target=_run_startup_warmup,
name="ai-ops-startup-warmup",
daemon=True, # won't block interpreter shutdown
)
warmup_thread.start()
logger.info("Started startup warmup thread (MCP cache)")
except Exception as e:
# Release the guard so a later process restart can retry.
_WARMUP_STARTED.clear()
logger.warning(f"Failed to start startup warmup: {e}")
super().ready()