Flows Architecture¶
This section details the architecture for the "Event-Driven Agentic Flows" (Flows) feature, enabling automated workflows triggered by various events and executed by AI agents.
1. Overview and Goals¶
The "Flows" feature allows users to define automated workflows that are initiated by events from integrated systems (e.g., GitHub, GitLab, Jira) or other sources (e.g., incidents, OpenTelemetry data). Each Flow leverages a configurable AI model and a dynamic prompt to perform tasks, potentially utilizing specified MCP (Meta-Cognitive Prompting) servers and tools.
Key Goals:
- Event-Driven Automation: Trigger workflows based on specific events (e.g., commit to main, new issue, PR merged, incident triggered).
- Dynamic Prompting: Allow prompts to be constructed dynamically, incorporating project-specific context (e.g., documentation summaries, code component maps).
- Configurable AI Models: Enable users to select and configure different AI models for different Flows, managing API keys securely.
- Controlled Tool Usage: Provide a mechanism to specify which MCP servers and tools an AI agent can use during a Flow's execution.
- Extensibility: Design for easy addition of new event sources, AI models, and agent capabilities.
- User Experience: Allow users to define Flows from presets, customize existing ones, or create them from scratch. The UI should be intuitive and guide the user through the process of creating and configuring a flow.
2. Key Components & Their Roles¶
graph TD
subgraph "External Event Sources"
direction LR
Trackers["Issue Trackers (GitHub, GitLab, Jira)"]
OtherSources["Other Sources (Incidents, OTel, etc.)"]
end
subgraph "Preloop Core"
direction TB
WebhookEndpoint["Webhook Endpoint (/api/v1/private/webhooks/...)"]
TaskQueue["Internal Task Queue (NATS)"]
APIExt["Preloop API (Flow/AIModel CRUD, Logs)"]
preloop.modelsDB["preloop.models (PostgreSQL - Flows, AIModels, Executions)"]
end
subgraph "Flows Subsystem"
direction TB
FlowTriggerService["Flow Trigger Service (part of Worker)"]
FlowExecOrchestrator["Flow Execution Orchestrator"]
AgentInfra["Agent Execution Infrastructure (Docker/K8s/CLI)"]
subgraph "Agent Session (Container or Process)"
Agent["Agent (OpenHands, Claude Code, Aider, etc.)"]
AIModelClient["AIModel Client (via AIModel)"]
MCPClient["MCP Client (for allowed tools)"]
end
end
subgraph "External Services"
AIModelAPIs["AI Model APIs (OpenAI, Anthropic, etc.)"]
MCPServers["MCP Servers"]
end
Trackers -- Webhook Event --> WebhookEndpoint
WebhookEndpoint -- Publishes Task --> TaskQueue
TaskQueue -- Delivers Task --> FlowTriggerService
FlowTriggerService -- Reads Flow Defs --> PrelooModelsDB
FlowTriggerService -- Initiates Execution --> FlowExecOrchestrator
FlowExecOrchestrator -- Reads Flow/AIModel --> PrelooModelsDB
FlowExecOrchestrator -- Resolves Prompt Data --> PrelooModelsDB
FlowExecOrchestrator -- Manages --> AgentInfra
AgentInfra -- Runs --> Agent
Agent -- Uses --> AIModelClient
AIModelClient -- Calls --> AIModelAPIs
Agent -- Uses --> MCPClient
MCPClient -- Calls --> MCPServers
FlowExecOrchestrator -- Writes Logs --> PrelooModelsDB
APIExt -- Manages --> PrelooModelsDB
APIExt -- Serves Logs --> UserClient["User Client (UI/CLI)"]
- Flow Definition (
Flows):- Stored in
preloop-models. - Details the triggering event, prompt template, selected
AIModelID, agent type (e.g., "codex", "gemini", "openhands", "aider"), agent configuration (e.g., specific agent parameters), and a list of allowed MCP servers and specific tools. - Presets are implemented as special, non-editable (or cloneable) records in this table.
- Stored in
- AI Model (
AIModel):- Stored in
preloop-models. - Reusable definitions for AI models, including their identifiers (e.g.,
openai/gpt-4), API endpoints, gateway metadata, and upstream credentials when needed. - Credentials can be resolved through Preloop's secret reference layer instead of being embedded directly into flow definitions.
- Models are linked to an
Account.
- Stored in
- Event Ingestion:
- Primarily through the existing webhook endpoint (e.g.,
/api/v1/private/webhooks/{tracker_type}/{org_identifier}) for tracker events. - This endpoint validates incoming webhooks and then publishes a
process_webhook_eventtask to the Internal Task Queue (NATS).
- Primarily through the existing webhook endpoint (e.g.,
- Internal Task Queue (NATS):
- NATS is used as a simple, reliable task queue. It decouples the API from the background processing of events and flows.
- The
EventBusservice is used to enqueue tasks. - Workers consume tasks from the
preloop_sync.taskssubject using aworkqueueretention policy, which ensures that acknowledged messages are immediately removed from the stream.
- Flow Trigger Service:
- This logic is part of the NATS worker. When a
process_webhook_eventtask is received, the worker acts as the trigger service. - It matches the incoming event data against the
trigger_event_sourceandtrigger_event_typedefined in activeFlows. - Upon a match, it initiates a Flow execution by invoking the Flow Execution Orchestrator.
- This logic is part of the NATS worker. When a
- Flow Execution Orchestrator:
- Responsible for managing the lifecycle of a single Flow execution.
- Retrieves the
Flowdefinition and its associatedAIModelfrom the database. - Dynamic Prompt Resolution: Parses the
prompt_templateand resolves any placeholders (e.g.,{{project_docs_summary}},{{relevant_code_files}}) by queryingpreloop-modelsor other Preloop services for the necessary context data. - Prepares the complete execution context for the agent, including the fully resolved prompt, AI model details, managed gateway settings when enabled, and the specific list of allowed MCP servers/tools.
- Initiates and manages an agent session via the Agent Execution Infrastructure based on the configured
agent_type. - Monitors the execution and records results, logs, gateway events, and runtime-session attribution.
-
Agent Execution Infrastructure:
- Manages the runtime environment for agentic workflows.
- Production Mode (Container-based): All agents run in isolated containers (Docker or Kubernetes) for security, isolation, and scalability. Each Flow execution spawns a new container regardless of agent type.
- Development Mode (Optional): For local development only, CLI-based agents can be run as processes to simplify testing and iteration.
-
Provides an abstraction layer for different agent types:
- Codex CLI
- Gemini CLI
- OpenHands: Container with the OpenHands library and runtime dependencies
- Aider: Container with
aiderCLI and dependencies pre-installed
-
The Flow Execution Orchestrator interacts with this infrastructure to start, monitor, and terminate agent sessions.
- Agent (running in a container):
- The core agentic execution environment running in an isolated container (or process in dev mode).
- Supported agent types:
- Codex CLI
- Gemini CLI
- OpenHands: Library-based agent (default implementation)
- Aider: CLI-based agent using
aidercommand - Other agents: Extensible to support additional agent frameworks
- Receives the resolved prompt, AI model configuration, runtime token, and allowed MCP toolset from the Flow Execution Orchestrator.
- Manages the interaction with the configured AI model.
- Direct MCP Calls: The agent can directly call the allowed MCP tools on the specified MCP servers, with necessary network access and authentication provided by the execution infrastructure.
- Flow Execution Log (
FlowExecutions): - A database table in
preloop-modelsto record the history and outcome of each Flow run. - Includes details like the triggering event, start/end times, status (pending, running, succeeded, failed), the resolved input prompt, a summary of actions taken by the agent, logs of MCP tool usage, and a reference to more detailed logs from the agent session (e.g., container logs, session ID, or process output).
- Runtime Session (
RuntimeSession): - A shared session identity used to attribute model traffic and operator actions across flow executions and managed external agents.
- Links gateway usage, recent activity, and session lifecycle state to a common runtime principal.
- Preloop API Extensions:
- The current API surface includes:
- CRUD (Create, Read, Update, Delete) operations on
FlowsandAIModels. - Listing and retrieving
FlowExecutionhistory and logs. - Flow- and account-scoped gateway usage summaries.
- Execution-scoped gateway event inspection.
- Runtime-session and managed-agent browsing endpoints that connect execution details to broader runtime history.
- CRUD (Create, Read, Update, Delete) operations on
3. Database Schema Considerations¶
The following Pydantic schemas and corresponding SQLAlchemy models have been defined within preloop-models:
FlowAIModelFlowExecution
4. Data Flow Diagrams¶
(Mermaid diagrams will be used here as shown in the "Key Components" section and expanded as needed for specific flows.)
a. Event Ingestion & Flow Triggering:
sequenceDiagram
participant ExtSrc as External Event Source (e.g., GitHub)
participant WebhookEP as Preloop Webhook Endpoint
participant TaskQueue as Internal Task Queue (NATS)
participant Worker as Preloop Sync Worker (Flow Trigger)
participant FlowsDB as Flows Database (preloop-models)
participant FlowExecOrch as Flow Execution Orchestrator
ExtSrc->>+WebhookEP: Sends Webhook (e.g., push event)
WebhookEP->>+TaskQueue: Publishes `process_webhook_event` Task
TaskQueue-->>-Worker: Delivers Task
Worker->>+FlowsDB: Queries for matching Flow definitions
FlowsDB-->>-Worker: Returns matching Flow(s)
alt If match found
Worker->>+FlowExecOrch: Initiates Flow Execution (with Flow ID, Event Data)
FlowExecOrch-->>-Worker: Acknowledges (e.g., Execution ID)
end
b. Flow Execution (Simplified):
sequenceDiagram
participant FlowExecOrch as Flow Execution Orchestrator
participant ModelsDB as preloop-models (Flows, AIModels, Context Data)
participant AgentInfra as Agent Execution Infrastructure
participant Agent as Agent (Container or Process)
participant Gateway as Preloop Model Gateway
participant AIModel_API as Upstream AI Model API
participant MCP_Srv as Allowed MCP Server(s)
FlowExecOrch->>+ModelsDB: Get Flow & AIModel details
ModelsDB-->>-FlowExecOrch: Return details
FlowExecOrch->>+ModelsDB: Get data for dynamic prompt placeholders
ModelsDB-->>-FlowExecOrch: Return context data
Note over FlowExecOrch: Resolves prompt, prepares agent context (runtime token, model routing, allowed MCPs)
FlowExecOrch->>+AgentInfra: Request agent session (with agent_type and context)
AgentInfra->>+Agent: Starts agent (container, process, or other)
Agent->>+Gateway: Sends model request using managed credentials
Gateway->>Gateway: Applies budget and allowed-model checks
Gateway->>+AIModel_API: Forwards approved request upstream
AIModel_API-->>-Gateway: AI model responses
Gateway-->>-Agent: Response + usage attribution
alt If AI model decides to use an MCP tool
Agent->>+MCP_Srv: Calls allowed MCP tool directly
MCP_Srv-->>-Agent: Tool result
end
Agent-->>-AgentInfra: Execution logs/status
AgentInfra-->>-FlowExecOrch: Execution logs/status/results
FlowExecOrch->>+ModelsDB: Store execution results in FlowExecutions table
5. Real-Time UI Updates & Interactivity¶
To provide users with live feedback and enable future interactivity with Flow executions, a structured, message-based real-time architecture will be implemented.
graph TD
subgraph "Flows Subsystem (Worker)"
FlowExecOrchestrator["Flow Execution Orchestrator"]
end
subgraph "NATS Messaging"
direction LR
NatsUpdates["Updates Stream (flow-updates.{exec_id})"]
NatsInputs["Inputs Stream (flow-inputs.{exec_id})"]
end
subgraph "API Server"
WebSocketServer["WebSocket Server"]
end
subgraph "Browser"
FlowExecUI["Flow Executions UI"]
end
%% Data Flow
FlowExecOrchestrator -- Publishes structured JSON --> NatsUpdates
WebSocketServer -- Subscribes to --> NatsUpdates
NatsUpdates -- Streams messages --> WebSocketServer
WebSocketServer -- Pushes updates to --> FlowExecUI
%% Future Interactivity Flow
FlowExecUI -- Sends user input --> WebSocketServer
WebSocketServer -- Publishes input to --> NatsInputs
NatsInputs -- Delivers input to --> FlowExecOrchestrator
FlowExecOrchestrator -- Subscribes to --> NatsInputs
Components & Protocol:
- Structured Messaging: Communication uses a standardized JSON envelope, allowing for different message types. This is critical for future extensibility.
- NATS Streams:
flow-updates.{execution_id}: A server-to-client stream for broadcasting updates from theFlowExecutionOrchestrator. This will be implemented now.flow-inputs.{execution_id}: A client-to-server stream for sending user input back to theFlowExecutionOrchestrator. This is reserved for future interactive features.
- WebSocket Server: The server will handle routing messages between the browser and the appropriate NATS streams.
- Initial Message Types: For the first implementation, the following message types will be supported in the
flow-updatesstream:status_update: For lifecycle changes (e.g.,RUNNING,SUCCEEDED).log: For streaming text output from the agent.tool_call: For structured information about tools being used.
- Future Extensibility: This design allows for the seamless addition of new message types to support interactivity (
user_input_request,user_input_response) or advanced capabilities (ui_control_command) without requiring architectural changes.- CRUD operations for these new entities will be added to
preloop-models. - Will be queried by the Flow Execution Orchestrator to resolve dynamic prompt content.
- CRUD operations for these new entities will be added to
Preloop Sync/ Webhook Infrastructure:- The existing webhook ingestion mechanism within the main
Preloop APIwill publish aprocess_webhook_eventtask to the NATS task queue. - The
Preloop Syncworker, upon receiving this task, will be responsible for triggering the appropriate Agentic Flows.
- The existing webhook ingestion mechanism within the main
Preloop API:- Will be extended with new RESTful API endpoints for:
- Managing
Flows(CRUD, enable/disable, list presets). - Managing
AIModels(CRUD, share). - Retrieving
FlowExecutionhistory, status, and logs (including links or content from OpenHands logs).
- Managing
- Will be extended with new RESTful API endpoints for:
Preloop-MCP(and other MCP Servers):- MCP servers are consumers in this context. The OpenHands agents, as configured per Flow, will directly call tools on these MCP servers.
- The
Flowdefinition will specify which MCP servers and which specific tools on those servers the agent is permitted to use.
6. Agent Integration - Key Aspects¶
- Agent Type Selection: The
Flow.agent_typefield specifies which agent implementation to use (e.g., 'codex', 'gemini', 'openhands', 'aider'). The Agent Execution Infrastructure uses this to determine how to start and manage the agent session. - Agent Configuration: The
Flow.agent_configfield stores JSON with agent-specific configuration:- OpenHands:
{"agent_type": "CodeActAgent", "max_iterations": 10} - Claude Code:
{"model": "claude-3-5-sonnet", "max_tokens": 4096} - Aider:
{"model": "claude-3-5-sonnet", "auto_test": true, "edit_format": "diff"}
- OpenHands:
- Dynamic Prompts: The Flow Execution Orchestrator resolves placeholders in
Flow.prompt_templatebefore passing the final prompt to any agent. - AI Model Selection & Routing:
Flow.ai_model_idlinks to aAIModelrecord.- AI models can be configured with upstream credentials through Preloop's secret abstraction and optionally marked for gateway routing.
- When gateway routing is enabled, the Flow Execution Orchestrator can provide a managed base URL, model alias, and short-lived token instead of exposing provider credentials directly to the agent.
- MCP Tool Discovery & Invocation:
- The
Flow.allowed_mcp_serversandFlow.allowed_mcp_toolsdefine the explicit allowlist. - The agent environment is configured to make network calls to these allowed MCP servers.
- If MCPs require authentication, the Flow Execution Orchestrator injects necessary tokens/keys for only the allowed MCPs into the agent session.
- Agent-specific mechanisms are used to expose MCP tools (e.g., OpenHands' tool/action definitions, MCP configuration files for CLI agents).
- The
- Shared Runtime Observability:
- Gateway traffic can be captured as execution-scoped events and account-scoped usage rows.
- Runtime sessions tie one flow execution's agent activity to broader session-centric browsing and audit surfaces.
7. Security Considerations¶
- Model Credential Security:
- Upstream model credentials can be stored behind secret references rather than embedded directly into flow definitions.
- Gateway-enabled runtimes can receive short-lived runtime tokens instead of provider API keys.
- Least privilege still applies: the orchestrator and gateway should only access the model credentials required for the selected flow.
- MCP Access Control:
- The
allowed_mcp_serversandallowed_mcp_toolsin theFlowdefinition act as a strict allowlist. - The OpenHands agent environment must be configured (e.g., network policies if running in Kubernetes, or wrapper logic around tool calls) to prevent calls to unlisted MCPs or tools.
- Consider if MCPs themselves have authentication/authorization; if so, the Flow execution might need to propagate user/Flow identity or use pre-configured service accounts.
- The
- Subject-Scoped Governance:
- Managed-agent and API-key scope can narrow allowed tools and allowed models beyond the parent account defaults.
- Gateway checks should fail closed when a runtime tries to access a model outside its allowed scope.
- Input Validation & Sanitization:
- Validate all inputs for Flow definitions, especially prompt templates and configurations.
- Sanitize any data from events or dynamic context before it's incorporated into prompts to prevent injection attacks against the AI model or downstream tools.
- Resource Limits for Agents:
- All agents (especially container-based ones) should have resource limits (CPU, memory, execution time) to prevent abuse or runaway processes.
- Logging and Auditing:
- Comprehensive logging of Flow executions, including which MCP tools were called with what parameters (sensitive data redacted).
- Model gateway events should capture normalized request/response previews with redaction-aware handling instead of raw secret-bearing payloads.
- Audit trails for changes to
FlowsandAIModels.
- Permissions:
- Role-based access control (RBAC) for managing
FlowsandAIModelsvia the Preloop API. Users should only be able to create/edit/view Flows and AIModels within their authorized scope (e.g., organization).
- Role-based access control (RBAC) for managing
8. Scalability and Extensibility¶
- Task Queue (NATS): NATS is chosen for its high performance, lightweight nature, and scalability, allowing the system to handle a high volume of incoming tasks.
- Flow Trigger Service: Can be scaled horizontally if event processing becomes a bottleneck. It should be stateless.
- Flow Execution Orchestrator: Can also be scaled, though individual orchestration tasks might be stateful for the duration of a Flow.
- Agent Execution Infrastructure:
- Leveraging containers (Docker or Kubernetes jobs) for all agent types is key to scalability and security. Each Flow execution runs in an isolated container.
- The system can be configured to run these containers on a Docker host (via socket) or a Kubernetes cluster, allowing for dynamic scaling of agent execution capacity based on demand.
- The orchestrator manages the lifecycle of these containerized jobs.
- Database:
preloop-models(PostgreSQL) should be monitored for performance. Read replicas can be used for read-heavy operations like fetching Flow definitions or execution logs. - Extensibility:
- New Event Sources: Add new parsers/adapters to publish
process_webhook_eventtasks. The Flow Trigger Service can then match these new event types. - New AI Models: Add new
AIModelrecords. The Flow Execution Orchestrator and agents need to be compatible with the new model's API. - New MCP Tools/Servers: Update the allowlist options. Agents need to be able to make calls to these new tools.
- New Agent Types: Create new container images for additional agent frameworks. Update the Agent Execution Infrastructure to support the new agent type. The abstraction layer makes this straightforward.
- New Event Sources: Add new parsers/adapters to publish
9. Preset Use Case Examples¶
Example automated workflows that can be configured:
- Commit to
main-> Doc/Test Check: Evaluate if documentation or tests require updates and open issues or PRs with suggested changes. - New Issue Created -> Triage & Label: Analyze new issue content, suggest priority, labels, and potentially assign based on keywords.
- PR Merged -> Release Notes Draft: Summarize changes and draft release notes.
- Downtime Incident -> Initial Investigation: Check deployed version, telemetry data, and suggest remediation actions.
- New User Feedback -> Summarize & Categorize: Parse feedback, categorize it, and create corresponding issues.
- Scheduled Code Quality Scan -> Analyze & Report: Run static analysis and create tasks for critical issues.
Enterprise Features: Preloop Enterprise Edition adds usage tracking, billing integration with Stripe, subscription management, and feature gating based on plan tiers. Contact sales@preloop.ai for more information.