Skip to main content

Overview

Processes are execution orchestration patterns in OrbitAI that determine how tasks are assigned to agents and in what order they execute. The process type fundamentally shapes how work flows through your agent system, from simple sequential chains to complex hierarchical coordination.

Sequential

Tasks execute one after another in order

Hierarchical

Manager coordinates and delegates tasks

Flow-Based

Dependency-driven execution with parallelization

Context Flow

Information flows between tasks

Dynamic Assignment

Intelligent agent selection per task

Error Handling

Configurable failure strategies

Key Characteristics

Processes provide predictable execution patterns with well-defined task ordering and agent assignment strategies.
Each process type manages how context and results flow between tasks, enabling tasks to build upon previous outputs.
Processes determine how multiple agents work together, from simple turn-taking to sophisticated delegation patterns.
Different process types offer different scalability characteristics, from simple sequential execution to complex parallel workflows.

Process Types

OrbitAI supports multiple process types, each suited for different workflow patterns:

Sequential Process

The simplest and most common process type where tasks execute one after another.
  • Overview
  • Characteristics
  • Use Cases
  • Example
Sequential Process executes tasks in a strict linear order, with each task completing before the next begins.
let orbit = try await Orbit.create(
    name: "Content Creation Pipeline",
    agents: [researcher, writer, editor],
    tasks: [researchTask, writingTask, editingTask],
    process: .sequential,  // Tasks execute in order
    verbose: true
)
Execution Flow:
Task 1 (Research) → Complete

Task 2 (Writing) → Complete

Task 3 (Editing) → Complete

Workflow Complete
Sequential is the default process type and works well for most workflows where tasks have clear dependencies.

Hierarchical Process

Advanced process type where a manager agent coordinates and delegates tasks to worker agents.
  • Overview
  • Characteristics
  • Use Cases
  • Example
Hierarchical Process introduces a manager-worker pattern where a designated manager agent:
  • Analyzes the overall workflow
  • Decides task assignment strategies
  • Delegates tasks to appropriate worker agents
  • Coordinates execution and monitors progress
let orbit = try await Orbit.create(
    name: "Complex Project",
    agents: [workerAgent1, workerAgent2, workerAgent3],
    tasks: complexTasks,
    process: .hierarchical,
    manager: managerAgent,  // Manager coordinates
    verbose: true
)
Execution Flow:
Manager Agent (Orchestrator)

Analyzes workflow and tasks

┌───────┼───────┐
↓       ↓       ↓
Worker  Worker  Worker
Agent 1 Agent 2 Agent 3
    ↓       ↓       ↓
Task 1  Task 2  Task 3
    ↓       ↓       ↓
└───────┼───────┘

Manager validates results

Workflow Complete
Hierarchical process requires a manager agent with allowDelegation: true configured.

Flow-Based Process

Advanced dependency-driven execution with support for parallel task execution.
  • Overview
  • Characteristics
  • Use Cases
  • Example
Flow-Based Process uses a TaskFlow structure to define complex workflows with:
  • Explicit task dependencies
  • Parallel execution where possible
  • Topological sorting for optimal ordering
  • Conditional execution
  • Failure handling strategies
let taskFlow = TaskFlow(
    name: "Data Processing Pipeline",
    tasks: [
        dataIngestion,    // No dependencies
        validation1,      // Depends on ingestion
        validation2,      // Depends on ingestion (parallel with validation1)
        transform,        // Depends on both validations
        analysis,         // Depends on transform
        reporting         // Depends on analysis
    ],
    stopOnFailure: true
)

let result = try await taskEngine.executeTaskFlow(
    taskFlow: taskFlow,
    agents: agents,
    context: context
)
Execution Flow:
Data Ingestion

┌───┴───┐
│       │
Valid1  Valid2  (Parallel)
│       │
└───┬───┘

Transform

Analysis

Reporting

Task Dependencies and Processes

How Processes Affect Task Execution

  • Sequential
  • Hierarchical
  • Flow-Based

Sequential Process Behavior

Task Dependencies:
  • Tasks execute in array order
  • Each task waits for previous to complete
  • Context automatically includes all previous outputs
Agent Assignment:
// Task with explicit agent
let task1 = ORTask(
    description: "Specialized task",
    expectedOutput: "Result",
    agent: specialistAgent.id  // Uses this agent
)

// Task without explicit agent
let task2 = ORTask(
    description: "General task",
    expectedOutput: "Result"
    // Engine selects best available agent
)
Context Flow:
// Sequential process automatically provides:
// - task_0_output (first task's output)
// - task_1_output (second task's output)
// - task_N_output (Nth task's output)

let task3 = ORTask(
    description: """
    Combine previous results:
    First: {task_0_output}
    Second: {task_1_output}
    """,
    expectedOutput: "Combined result"
)
Failure Handling:
  • If any task fails, entire workflow stops
  • No subsequent tasks execute
  • Error propagates to orbit result

Context Interpolation

All process types support context variable interpolation:
Reference previous task outputs in descriptions:
let task1 = ORTask(
    description: "Research AI trends",
    expectedOutput: "Research report"
)

let task2 = ORTask(
    description: """
    Write article based on research.
    Research data: {task_0_output}
    """,
    expectedOutput: "Article draft",
    context: [task1.id]
)
Variable Format:
  • {task_0_output} - First task’s output
  • {task_1_output} - Second task’s output
  • {task_N_output} - Nth task’s output
Task indices are 0-based, matching array indexing in Swift.
Use orbit-level inputs in any task:
let task = ORTask(
    description: """
    Analyze data for {industry} sector.
    Focus on {metric} metrics.
    Time period: {period}
    """,
    expectedOutput: "Analysis report"
)

// Provide inputs at execution time
let inputs = OrbitInput(Metadata([
    "industry": .string("healthcare"),
    "metric": .string("efficiency"),
    "period": .string("Q4 2024")
]))

let result = try await orbit.start(inputs: inputs)
Tasks automatically receive agent context:
// Agent context is available in system message
let agent = Agent(
    role: "Financial Analyst",
    purpose: "Analyze financial data",
    context: """
    Expert in:
    - Financial modeling
    - Risk assessment
    - Market analysis
    """
)

// Tasks executed by this agent have access to this context
// through the system message

Best Practices

Process Selection

Use Sequential For

Simple Linear Workflows
  • Clear dependencies
  • Each step builds on previous
  • Easy to understand and debug
process: .sequential

Use Hierarchical For

Complex Coordination
  • Many agents with different skills
  • Dynamic task assignment needed
  • Quality validation required
  • Load balancing important
process: .hierarchical
manager: coordinatorAgent

Use Flow-Based For

Parallel Opportunities
  • Independent tasks can run simultaneously
  • Complex dependency graphs
  • Performance critical workflows
  • Build systems and pipelines
TaskFlow(tasks: tasks)

Consider Hybrid

Nested Workflows
  • Sequential top-level
  • Hierarchical within phases
  • Flow-based for parallelizable sections
Break complex workflows into smaller orbits and chain them.

Context Management

1

Explicit Context References

Always declare context dependencies:
let task = ORTask(
    description: "Process data from: {task_0_output}",
    expectedOutput: "Processed data",
    context: [previousTask.id]  // Explicit declaration
)
Without explicit context declaration, the task may not have access to previous outputs in hierarchical and flow-based processes.
2

Minimize Context Size

Keep context concise for better performance:
// Good: Specific reference
description: "Summarize key points from: {task_0_output}"

// Better: Extract only needed data
let extractTask = ORTask(
    description: "Extract top 5 insights from research",
    expectedOutput: "5 key insights"
)

let summaryTask = ORTask(
    description: "Create summary from: {task_0_output}",
    context: [extractTask.id]
)
3

Structure Outputs

Use structured outputs for easier downstream processing:
struct ResearchFindings: Codable, Sendable {
    let keyPoints: [String]
    let sources: [String]
    let confidence: Double
}

let researchTask = ORTask.withStructuredOutput(
    description: "Research AI trends",
    expectedType: ResearchFindings.self,
    agent: researcher.id
)

// Downstream tasks can parse structured data easily

Performance Optimization

Maximize parallel execution opportunities:
// Bad: Sequential when tasks are independent
let sequential = [
    fetchAPI1Task,      // Could run in parallel
    fetchAPI2Task,      // Could run in parallel
    fetchAPI3Task,      // Could run in parallel
    mergeTask
]

// Good: Use flow-based for parallelization
let flow = TaskFlow(tasks: [
    fetchAPI1Task,      // Parallel
    fetchAPI2Task,      // Parallel
    fetchAPI3Task,      // Parallel
    mergeTask          // Waits for all fetches
])

// Configure concurrent execution
let engine = TaskExecutionEngine(
    agentExecutor: executor,
    maxConcurrentTasks: 3  // Run fetches in parallel
)
Reuse agents efficiently:
// Create agent pool for common tasks
let workerAgents = (1...5).map { i in
    Agent(
        role: "Data Processor \(i)",
        purpose: "Process data efficiently",
        context: "Generic data processing agent",
        tools: ["data_processor"]
    )
}

// Hierarchical process will distribute work
let orbit = Orbit.create(
    agents: workerAgents,
    tasks: manyTasks,
    process: .hierarchical,
    manager: coordinator
)
Set appropriate concurrency limits:
// Balance parallelism with resource limits
let engine = TaskExecutionEngine(
    agentExecutor: executor,
    maxConcurrentTasks: 5,  // Don't overwhelm system
    enableVerboseLogging: false  // Reduce overhead
)

// Consider:
// - API rate limits (50-100 requests/min)
// - Memory constraints
// - CPU availability
// - LLM provider limits
Configure appropriate failure handling:
// Critical path: stop on failure
let criticalFlow = TaskFlow(
    tasks: criticalTasks,
    stopOnFailure: true
)

// Best effort: continue on failure
let bestEffortFlow = TaskFlow(
    tasks: optionalTasks,
    stopOnFailure: false  // Collect all results
)

// Test suites: run all tests
let testFlow = TaskFlow(
    tasks: testTasks,
    stopOnFailure: false  // Run all tests
)

Error Handling

  • Sequential
  • Hierarchical
  • Flow-Based
do {
    let result = try await orbit.start()

    // Process successful results
    for (index, output) in result.taskOutputs.enumerated() {
        print("Task \(index): \(output.rawOutput)")
    }
} catch let error as OrbitAIError {
    // Handle specific errors
    switch error {
    case .taskExecutionFailed(let message):
        print("Task failed: \(message)")
        // Check which task failed
        let failedTasks = orbit.tasks.filter { $0.status == .failed }
        for task in failedTasks {
            print("Failed task: \(task.description)")
        }

    case .llmRateLimitExceeded:
        print("Rate limit hit - implement backoff")

    case .agentNotFound:
        print("Agent assignment issue")

    default:
        print("Other error: \(error)")
    }
}

Troubleshooting

Symptoms: Tasks run in unexpected sequenceCauses:
  • Using hierarchical process (manager decides order)
  • Missing dependency declarations in flow-based
  • Context references without proper dependencies
Solutions:
// For sequential: tasks run in array order
let tasks = [task1, task2, task3]  // Runs 1→2→3

// For hierarchical: manager decides order
// Solution: Use sequential if order matters

// For flow-based: declare dependencies
let task2 = ORTask(
    description: "Task 2",
    dependencies: [task1.id]  // Explicit dependency
)
Symptoms: {task_N_output} appears literally in outputCauses:
  • Missing context declaration
  • Wrong task index
  • Task not executed yet (dependency issue)
Solutions:
// Always declare context
let task = ORTask(
    description: "Use data: {task_0_output}",
    expectedOutput: "Result",
    context: [previousTask.id]  // Required!
)

// Verify task index (0-based)
// task_0_output = first task (index 0)
// task_1_output = second task (index 1)

// Ensure dependency order
let dependent = ORTask(
    description: "Use {task_1_output}",
    context: [secondTask.id],
    dependencies: [secondTask.id]  // Flow-based
)
Symptoms: Manager doesn’t assign tasks to workersCauses:
  • Manager missing allowDelegation: true
  • Manager lacks delegation tools
  • Poor manager context/prompting
Solutions:
// Enable delegation
let manager = Agent(
    role: "Project Manager",
    purpose: "Delegate and coordinate tasks",
    context: """
    You are a project manager coordinating a team.

    Your responsibilities:
    1. Analyze each task's requirements
    2. Select the best agent for each task
    3. Assign tasks to appropriate team members
    4. Monitor progress and quality

    Available team members and their expertise:
    [List agents and their capabilities]
    """,
    allowDelegation: true,  // Required!
    tools: [
        "task_analyzer",
        "agent_evaluator",
        "task_delegator"
    ]
)

// Provide good worker context
let worker = Agent(
    role: "Backend Developer",
    purpose: "Develop backend services",
    context: "Expert in Swift, APIs, databases",
    tools: ["code_generator", "api_designer"]
)
Symptoms: “Circular dependency detected” errorCauses:
  • Task A depends on Task B depends on Task A
  • Indirect circular dependencies through multiple tasks
Solutions:
// Identify the cycle
// Bad:
taskA.dependencies = [taskB.id]
taskB.dependencies = [taskA.id]  // Circular!

// Fix: Remove circular dependency
taskA.dependencies = []  // A first
taskB.dependencies = [taskA.id]  // B depends on A

// For complex graphs, visualize dependencies:
func printDependencyGraph(tasks: [ORTask]) {
    for task in tasks {
        print("\(task.id): depends on \(task.dependencies ?? [])")
    }
}
Symptoms: Flow-based not faster than sequentialCauses:
  • Too many dependencies (tasks can’t parallelize)
  • Low maxConcurrentTasks setting
  • API rate limits
  • Tasks too small (overhead > benefit)
Solutions:
// Increase concurrency
let engine = TaskExecutionEngine(
    agentExecutor: executor,
    maxConcurrentTasks: 10  // Higher limit
)

// Reduce dependencies
// Bad: Each task depends on previous
task2.dependencies = [task1.id]
task3.dependencies = [task2.id]
task4.dependencies = [task3.id]

// Good: Only necessary dependencies
task2.dependencies = [task1.id]
task3.dependencies = [task1.id]  // Parallel with task2
task4.dependencies = [task2.id, task3.id]  // After both

// Batch small tasks
// Instead of 100 tiny tasks, create 10 batches of 10
Symptoms: Token limit errors with many tasksCauses:
  • Accumulating context from all previous tasks
  • Large task outputs
  • Many tasks in workflow
Solutions:
// Extract only needed information
let summaryTask = ORTask(
    description: "Extract key points from research",
    expectedOutput: "Bullet list of 5 key findings"
)

// Use structured outputs
struct Summary: Codable, Sendable {
    let keyPoints: [String]  // Compact format
}

// Reference specific outputs, not all
let task = ORTask(
    description: "Use summary: {task_2_output}",
    context: [summaryTask.id]  // Only one task
    // Not: context: [task0, task1, task2, task3, ...]
)

// Enable context window management
let agent = Agent(
    role: "Agent",
    purpose: "Process data",
    context: "...",
    respectContextWindow: true  // Auto-prune
)

Process Comparison

Decision Matrix

CriteriaSequentialHierarchicalFlow-Based
ComplexitySimpleComplexMedium-High
Setup EffortLowHighMedium
ParallelizationNoneManager-drivenExplicit
Agent AssignmentAuto or explicitManager decidesAuto or explicit
DebuggingEasyModerateModerate
Best ForLinear workflowsDynamic coordinationPipelines
PerformancePredictableVariableHigh potential
Learning CurveLowHighMedium

When to Use Each

Sequential

Choose when:
  • Tasks have clear linear flow
  • Each step builds on previous
  • Simplicity is important
  • Debugging ease matters
  • Team is learning OrbitAI
Examples:
  • Content creation
  • Report generation
  • Data processing chains

Hierarchical

Choose when:
  • Many specialized agents
  • Dynamic task assignment needed
  • Quality validation required
  • Load balancing important
  • Complex coordination
  • Agent expertise varies greatly
Examples:
  • Software development projects
  • Customer support routing
  • Research coordination

Flow-Based

Choose when:
  • Independent tasks exist
  • Performance is critical
  • Complex dependencies
  • Parallel execution possible
  • Building pipelines
  • Need fine control
Examples:
  • ETL pipelines
  • Build systems
  • Test suites
  • Data aggregation

Next Steps

For additional support, consult the GitHub Discussions or check the Issue Tracker.