Skip to content

Flows Architecture

This section details the architecture for the "Event-Driven Agentic Flows" (Flows) feature, enabling automated workflows triggered by various events and executed by AI agents.

1. Overview and Goals

The "Flows" feature allows users to define automated workflows that are initiated by events from integrated systems (e.g., GitHub, GitLab, Jira) or other sources (e.g., incidents, OpenTelemetry data). Each Flow leverages a configurable AI model and a dynamic prompt to perform tasks, potentially utilizing specified MCP (Meta-Cognitive Prompting) servers and tools.

Key Goals:

  • Event-Driven Automation: Trigger workflows based on specific events (e.g., commit to main, new issue, PR merged, incident triggered).
  • Dynamic Prompting: Allow prompts to be constructed dynamically, incorporating project-specific context (e.g., documentation summaries, code component maps).
  • Configurable AI Models: Enable users to select and configure different AI models for different Flows, managing API keys securely.
  • Controlled Tool Usage: Provide a mechanism to specify which MCP servers and tools an AI agent can use during a Flow's execution.
  • Extensibility: Design for easy addition of new event sources, AI models, and agent capabilities.
  • User Experience: Allow users to define Flows from presets, customize existing ones, or create them from scratch. The UI should be intuitive and guide the user through the process of creating and configuring a flow.

2. Key Components & Their Roles

graph TD
    subgraph "External Event Sources"
        direction LR
        Trackers["Issue Trackers (GitHub, GitLab, Jira)"]
        OtherSources["Other Sources (Incidents, OTel, etc.)"]
    end

    subgraph "Preloop Core"
        direction TB
        WebhookEndpoint["Webhook Endpoint (/api/v1/private/webhooks/...)"]
        TaskQueue["Internal Task Queue (NATS)"]
        APIExt["Preloop API (Flow/AIModel CRUD, Logs)"]
        preloop.modelsDB["preloop.models (PostgreSQL - Flows, AIModels, Executions)"]
    end

    subgraph "Flows Subsystem"
        direction TB
        FlowTriggerService["Flow Trigger Service (part of Worker)"]
        FlowExecOrchestrator["Flow Execution Orchestrator"]
        AgentInfra["Agent Execution Infrastructure (Docker/K8s/CLI)"]
        subgraph "Agent Session (Container or Process)"
            Agent["Agent (OpenHands, Claude Code, Aider, etc.)"]
            AIModelClient["AIModel Client (via AIModel)"]
            MCPClient["MCP Client (for allowed tools)"]
        end
    end

    subgraph "External Services"
        AIModelAPIs["AI Model APIs (OpenAI, Anthropic, etc.)"]
        MCPServers["MCP Servers"]
    end

    Trackers -- Webhook Event --> WebhookEndpoint
    WebhookEndpoint -- Publishes Task --> TaskQueue
    TaskQueue -- Delivers Task --> FlowTriggerService
    FlowTriggerService -- Reads Flow Defs --> PrelooModelsDB
    FlowTriggerService -- Initiates Execution --> FlowExecOrchestrator
    FlowExecOrchestrator -- Reads Flow/AIModel --> PrelooModelsDB
    FlowExecOrchestrator -- Resolves Prompt Data --> PrelooModelsDB
    FlowExecOrchestrator -- Manages --> AgentInfra
    AgentInfra -- Runs --> Agent
    Agent -- Uses --> AIModelClient
    AIModelClient -- Calls --> AIModelAPIs
    Agent -- Uses --> MCPClient
    MCPClient -- Calls --> MCPServers
    FlowExecOrchestrator -- Writes Logs --> PrelooModelsDB
    APIExt -- Manages --> PrelooModelsDB
    APIExt -- Serves Logs --> UserClient["User Client (UI/CLI)"]

  • Flow Definition (Flows):
    • Stored in preloop-models.
    • Details the triggering event, prompt template, selected AIModel ID, agent type (e.g., "codex", "gemini", "openhands", "aider"), agent configuration (e.g., specific agent parameters), and a list of allowed MCP servers and specific tools.
    • Presets are implemented as special, non-editable (or cloneable) records in this table.
  • AI Model (AIModel):
    • Stored in preloop-models.
    • Reusable definitions for AI models, including their identifiers (e.g., openai/gpt-4), API endpoints, gateway metadata, and upstream credentials when needed.
    • Credentials can be resolved through Preloop's secret reference layer instead of being embedded directly into flow definitions.
    • Models are linked to an Account.
  • Event Ingestion:
    • Primarily through the existing webhook endpoint (e.g., /api/v1/private/webhooks/{tracker_type}/{org_identifier}) for tracker events.
    • This endpoint validates incoming webhooks and then publishes a process_webhook_event task to the Internal Task Queue (NATS).
  • Internal Task Queue (NATS):
    • NATS is used as a simple, reliable task queue. It decouples the API from the background processing of events and flows.
    • The EventBus service is used to enqueue tasks.
    • Workers consume tasks from the preloop_sync.tasks subject using a workqueue retention policy, which ensures that acknowledged messages are immediately removed from the stream.
  • Flow Trigger Service:
    • This logic is part of the NATS worker. When a process_webhook_event task is received, the worker acts as the trigger service.
    • It matches the incoming event data against the trigger_event_source and trigger_event_type defined in active Flows.
    • Upon a match, it initiates a Flow execution by invoking the Flow Execution Orchestrator.
  • Flow Execution Orchestrator:
    • Responsible for managing the lifecycle of a single Flow execution.
    • Retrieves the Flow definition and its associated AIModel from the database.
    • Dynamic Prompt Resolution: Parses the prompt_template and resolves any placeholders (e.g., {{project_docs_summary}}, {{relevant_code_files}}) by querying preloop-models or other Preloop services for the necessary context data.
    • Prepares the complete execution context for the agent, including the fully resolved prompt, AI model details, managed gateway settings when enabled, and the specific list of allowed MCP servers/tools.
    • Initiates and manages an agent session via the Agent Execution Infrastructure based on the configured agent_type.
    • Monitors the execution and records results, logs, gateway events, and runtime-session attribution.
  • Agent Execution Infrastructure:

    • Manages the runtime environment for agentic workflows.
    • Production Mode (Container-based): All agents run in isolated containers (Docker or Kubernetes) for security, isolation, and scalability. Each Flow execution spawns a new container regardless of agent type.
    • Development Mode (Optional): For local development only, CLI-based agents can be run as processes to simplify testing and iteration.
    • Provides an abstraction layer for different agent types:

      • Codex CLI
      • Gemini CLI
      • OpenHands: Container with the OpenHands library and runtime dependencies
      • Aider: Container with aider CLI and dependencies pre-installed
    • The Flow Execution Orchestrator interacts with this infrastructure to start, monitor, and terminate agent sessions.

    • Agent (running in a container):
    • The core agentic execution environment running in an isolated container (or process in dev mode).
    • Supported agent types:
      • Codex CLI
      • Gemini CLI
      • OpenHands: Library-based agent (default implementation)
      • Aider: CLI-based agent using aider command
      • Other agents: Extensible to support additional agent frameworks
    • Receives the resolved prompt, AI model configuration, runtime token, and allowed MCP toolset from the Flow Execution Orchestrator.
    • Manages the interaction with the configured AI model.
    • Direct MCP Calls: The agent can directly call the allowed MCP tools on the specified MCP servers, with necessary network access and authentication provided by the execution infrastructure.
    • Flow Execution Log (FlowExecutions):
    • A database table in preloop-models to record the history and outcome of each Flow run.
    • Includes details like the triggering event, start/end times, status (pending, running, succeeded, failed), the resolved input prompt, a summary of actions taken by the agent, logs of MCP tool usage, and a reference to more detailed logs from the agent session (e.g., container logs, session ID, or process output).
    • Runtime Session (RuntimeSession):
    • A shared session identity used to attribute model traffic and operator actions across flow executions and managed external agents.
    • Links gateway usage, recent activity, and session lifecycle state to a common runtime principal.
    • Preloop API Extensions:
    • The current API surface includes:
      • CRUD (Create, Read, Update, Delete) operations on Flows and AIModels.
      • Listing and retrieving FlowExecution history and logs.
      • Flow- and account-scoped gateway usage summaries.
      • Execution-scoped gateway event inspection.
      • Runtime-session and managed-agent browsing endpoints that connect execution details to broader runtime history.

3. Database Schema Considerations

The following Pydantic schemas and corresponding SQLAlchemy models have been defined within preloop-models:

  • Flow
  • AIModel
  • FlowExecution

4. Data Flow Diagrams

(Mermaid diagrams will be used here as shown in the "Key Components" section and expanded as needed for specific flows.)

a. Event Ingestion & Flow Triggering:

sequenceDiagram
    participant ExtSrc as External Event Source (e.g., GitHub)
    participant WebhookEP as Preloop Webhook Endpoint
    participant TaskQueue as Internal Task Queue (NATS)
    participant Worker as Preloop Sync Worker (Flow Trigger)
    participant FlowsDB as Flows Database (preloop-models)
    participant FlowExecOrch as Flow Execution Orchestrator

    ExtSrc->>+WebhookEP: Sends Webhook (e.g., push event)
    WebhookEP->>+TaskQueue: Publishes `process_webhook_event` Task
    TaskQueue-->>-Worker: Delivers Task
    Worker->>+FlowsDB: Queries for matching Flow definitions
    FlowsDB-->>-Worker: Returns matching Flow(s)
    alt If match found
        Worker->>+FlowExecOrch: Initiates Flow Execution (with Flow ID, Event Data)
        FlowExecOrch-->>-Worker: Acknowledges (e.g., Execution ID)
    end

b. Flow Execution (Simplified):

sequenceDiagram
    participant FlowExecOrch as Flow Execution Orchestrator
    participant ModelsDB as preloop-models (Flows, AIModels, Context Data)
    participant AgentInfra as Agent Execution Infrastructure
    participant Agent as Agent (Container or Process)
    participant Gateway as Preloop Model Gateway
    participant AIModel_API as Upstream AI Model API
    participant MCP_Srv as Allowed MCP Server(s)

    FlowExecOrch->>+ModelsDB: Get Flow & AIModel details
    ModelsDB-->>-FlowExecOrch: Return details
    FlowExecOrch->>+ModelsDB: Get data for dynamic prompt placeholders
    ModelsDB-->>-FlowExecOrch: Return context data
    Note over FlowExecOrch: Resolves prompt, prepares agent context (runtime token, model routing, allowed MCPs)
    FlowExecOrch->>+AgentInfra: Request agent session (with agent_type and context)
    AgentInfra->>+Agent: Starts agent (container, process, or other)
    Agent->>+Gateway: Sends model request using managed credentials
    Gateway->>Gateway: Applies budget and allowed-model checks
    Gateway->>+AIModel_API: Forwards approved request upstream
    AIModel_API-->>-Gateway: AI model responses
    Gateway-->>-Agent: Response + usage attribution
    alt If AI model decides to use an MCP tool
        Agent->>+MCP_Srv: Calls allowed MCP tool directly
        MCP_Srv-->>-Agent: Tool result
    end
    Agent-->>-AgentInfra: Execution logs/status
    AgentInfra-->>-FlowExecOrch: Execution logs/status/results
    FlowExecOrch->>+ModelsDB: Store execution results in FlowExecutions table

5. Real-Time UI Updates & Interactivity

To provide users with live feedback and enable future interactivity with Flow executions, a structured, message-based real-time architecture will be implemented.

graph TD
    subgraph "Flows Subsystem (Worker)"
        FlowExecOrchestrator["Flow Execution Orchestrator"]
    end

    subgraph "NATS Messaging"
        direction LR
        NatsUpdates["Updates Stream (flow-updates.{exec_id})"]
        NatsInputs["Inputs Stream (flow-inputs.{exec_id})"]
    end

    subgraph "API Server"
        WebSocketServer["WebSocket Server"]
    end

    subgraph "Browser"
        FlowExecUI["Flow Executions UI"]
    end

    %% Data Flow
    FlowExecOrchestrator -- Publishes structured JSON --> NatsUpdates
    WebSocketServer -- Subscribes to --> NatsUpdates
    NatsUpdates -- Streams messages --> WebSocketServer
    WebSocketServer -- Pushes updates to --> FlowExecUI

    %% Future Interactivity Flow
    FlowExecUI -- Sends user input --> WebSocketServer
    WebSocketServer -- Publishes input to --> NatsInputs
    NatsInputs -- Delivers input to --> FlowExecOrchestrator
    FlowExecOrchestrator -- Subscribes to --> NatsInputs

Components & Protocol:

  • Structured Messaging: Communication uses a standardized JSON envelope, allowing for different message types. This is critical for future extensibility.
    {
      "execution_id": "uuid-of-the-flow-execution",
      "timestamp": "iso-8601-timestamp",
      "type": "message_type",
      "payload": { ... }
    }
    
  • NATS Streams:
    • flow-updates.{execution_id}: A server-to-client stream for broadcasting updates from the FlowExecutionOrchestrator. This will be implemented now.
    • flow-inputs.{execution_id}: A client-to-server stream for sending user input back to the FlowExecutionOrchestrator. This is reserved for future interactive features.
  • WebSocket Server: The server will handle routing messages between the browser and the appropriate NATS streams.
  • Initial Message Types: For the first implementation, the following message types will be supported in the flow-updates stream:
    • status_update: For lifecycle changes (e.g., RUNNING, SUCCEEDED).
    • log: For streaming text output from the agent.
    • tool_call: For structured information about tools being used.
  • Future Extensibility: This design allows for the seamless addition of new message types to support interactivity (user_input_request, user_input_response) or advanced capabilities (ui_control_command) without requiring architectural changes.
    • CRUD operations for these new entities will be added to preloop-models.
    • Will be queried by the Flow Execution Orchestrator to resolve dynamic prompt content.
  • Preloop Sync / Webhook Infrastructure:
    • The existing webhook ingestion mechanism within the main Preloop API will publish a process_webhook_event task to the NATS task queue.
    • The Preloop Sync worker, upon receiving this task, will be responsible for triggering the appropriate Agentic Flows.
  • Preloop API:
    • Will be extended with new RESTful API endpoints for:
      • Managing Flows (CRUD, enable/disable, list presets).
      • Managing AIModels (CRUD, share).
      • Retrieving FlowExecution history, status, and logs (including links or content from OpenHands logs).
  • Preloop-MCP (and other MCP Servers):
    • MCP servers are consumers in this context. The OpenHands agents, as configured per Flow, will directly call tools on these MCP servers.
    • The Flow definition will specify which MCP servers and which specific tools on those servers the agent is permitted to use.

6. Agent Integration - Key Aspects

  • Agent Type Selection: The Flow.agent_type field specifies which agent implementation to use (e.g., 'codex', 'gemini', 'openhands', 'aider'). The Agent Execution Infrastructure uses this to determine how to start and manage the agent session.
  • Agent Configuration: The Flow.agent_config field stores JSON with agent-specific configuration:
    • OpenHands: {"agent_type": "CodeActAgent", "max_iterations": 10}
    • Claude Code: {"model": "claude-3-5-sonnet", "max_tokens": 4096}
    • Aider: {"model": "claude-3-5-sonnet", "auto_test": true, "edit_format": "diff"}
  • Dynamic Prompts: The Flow Execution Orchestrator resolves placeholders in Flow.prompt_template before passing the final prompt to any agent.
  • AI Model Selection & Routing:
    • Flow.ai_model_id links to a AIModel record.
    • AI models can be configured with upstream credentials through Preloop's secret abstraction and optionally marked for gateway routing.
    • When gateway routing is enabled, the Flow Execution Orchestrator can provide a managed base URL, model alias, and short-lived token instead of exposing provider credentials directly to the agent.
  • MCP Tool Discovery & Invocation:
    • The Flow.allowed_mcp_servers and Flow.allowed_mcp_tools define the explicit allowlist.
    • The agent environment is configured to make network calls to these allowed MCP servers.
    • If MCPs require authentication, the Flow Execution Orchestrator injects necessary tokens/keys for only the allowed MCPs into the agent session.
    • Agent-specific mechanisms are used to expose MCP tools (e.g., OpenHands' tool/action definitions, MCP configuration files for CLI agents).
  • Shared Runtime Observability:
    • Gateway traffic can be captured as execution-scoped events and account-scoped usage rows.
    • Runtime sessions tie one flow execution's agent activity to broader session-centric browsing and audit surfaces.

7. Security Considerations

  • Model Credential Security:
    • Upstream model credentials can be stored behind secret references rather than embedded directly into flow definitions.
    • Gateway-enabled runtimes can receive short-lived runtime tokens instead of provider API keys.
    • Least privilege still applies: the orchestrator and gateway should only access the model credentials required for the selected flow.
  • MCP Access Control:
    • The allowed_mcp_servers and allowed_mcp_tools in the Flow definition act as a strict allowlist.
    • The OpenHands agent environment must be configured (e.g., network policies if running in Kubernetes, or wrapper logic around tool calls) to prevent calls to unlisted MCPs or tools.
    • Consider if MCPs themselves have authentication/authorization; if so, the Flow execution might need to propagate user/Flow identity or use pre-configured service accounts.
  • Subject-Scoped Governance:
    • Managed-agent and API-key scope can narrow allowed tools and allowed models beyond the parent account defaults.
    • Gateway checks should fail closed when a runtime tries to access a model outside its allowed scope.
  • Input Validation & Sanitization:
    • Validate all inputs for Flow definitions, especially prompt templates and configurations.
    • Sanitize any data from events or dynamic context before it's incorporated into prompts to prevent injection attacks against the AI model or downstream tools.
  • Resource Limits for Agents:
    • All agents (especially container-based ones) should have resource limits (CPU, memory, execution time) to prevent abuse or runaway processes.
  • Logging and Auditing:
    • Comprehensive logging of Flow executions, including which MCP tools were called with what parameters (sensitive data redacted).
    • Model gateway events should capture normalized request/response previews with redaction-aware handling instead of raw secret-bearing payloads.
    • Audit trails for changes to Flows and AIModels.
  • Permissions:
    • Role-based access control (RBAC) for managing Flows and AIModels via the Preloop API. Users should only be able to create/edit/view Flows and AIModels within their authorized scope (e.g., organization).

8. Scalability and Extensibility

  • Task Queue (NATS): NATS is chosen for its high performance, lightweight nature, and scalability, allowing the system to handle a high volume of incoming tasks.
  • Flow Trigger Service: Can be scaled horizontally if event processing becomes a bottleneck. It should be stateless.
  • Flow Execution Orchestrator: Can also be scaled, though individual orchestration tasks might be stateful for the duration of a Flow.
  • Agent Execution Infrastructure:
    • Leveraging containers (Docker or Kubernetes jobs) for all agent types is key to scalability and security. Each Flow execution runs in an isolated container.
    • The system can be configured to run these containers on a Docker host (via socket) or a Kubernetes cluster, allowing for dynamic scaling of agent execution capacity based on demand.
    • The orchestrator manages the lifecycle of these containerized jobs.
  • Database: preloop-models (PostgreSQL) should be monitored for performance. Read replicas can be used for read-heavy operations like fetching Flow definitions or execution logs.
  • Extensibility:
    • New Event Sources: Add new parsers/adapters to publish process_webhook_event tasks. The Flow Trigger Service can then match these new event types.
    • New AI Models: Add new AIModel records. The Flow Execution Orchestrator and agents need to be compatible with the new model's API.
    • New MCP Tools/Servers: Update the allowlist options. Agents need to be able to make calls to these new tools.
    • New Agent Types: Create new container images for additional agent frameworks. Update the Agent Execution Infrastructure to support the new agent type. The abstraction layer makes this straightforward.

9. Preset Use Case Examples

Example automated workflows that can be configured:

  • Commit to main -> Doc/Test Check: Evaluate if documentation or tests require updates and open issues or PRs with suggested changes.
  • New Issue Created -> Triage & Label: Analyze new issue content, suggest priority, labels, and potentially assign based on keywords.
  • PR Merged -> Release Notes Draft: Summarize changes and draft release notes.
  • Downtime Incident -> Initial Investigation: Check deployed version, telemetry data, and suggest remediation actions.
  • New User Feedback -> Summarize & Categorize: Parse feedback, categorize it, and create corresponding issues.
  • Scheduled Code Quality Scan -> Analyze & Report: Run static analysis and create tasks for critical issues.

Enterprise Features: Preloop Enterprise Edition adds usage tracking, billing integration with Stripe, subscription management, and feature gating based on plan tiers. Contact sales@preloop.ai for more information.