Processes are execution orchestration patterns in OrbitAI that determine how tasks are assigned to agents and in what order they execute. The process type fundamentally shapes how work flows through your agent system, from simple sequential chains to complex hierarchical coordination.
// ETL with parallel validationlet pipeline = TaskFlow( tasks: [ extractTask, validateSchemaTask, // Parallel validateDataTask, // Parallel transformTask, // After validations loadTask // After transform ])
✅ Build Systems
// Parallel compilation and testinglet buildFlow = TaskFlow( tasks: [ compileModule1, // Parallel compileModule2, // Parallel compileModule3, // Parallel linkTask, // After all compile testTask // After link ])
✅ Multi-Source Aggregation
// Fetch from multiple sources in parallellet aggregation = TaskFlow( tasks: [ fetchAPI1, // Parallel fetchAPI2, // Parallel fetchAPI3, // Parallel mergeTask, // After all fetches analyzeTask // After merge ])
✅ Test Suites
// Run independent tests in parallellet testSuite = TaskFlow( tasks: [ unitTests, // Parallel integrationTests, // Parallel e2eTests, // Parallel reportTask // After all tests ], stopOnFailure: false // Run all tests even if some fail)
import OrbitAI// Define agentslet dataEngineer = Agent( role: "Data Engineer", purpose: "Handle data ingestion and transformation", context: "Expert in ETL processes", tools: ["data_loader", "data_transformer"])let validator = Agent( role: "Data Validator", purpose: "Validate data quality and schema", context: "Expert in data quality assurance", tools: ["schema_validator", "quality_checker"])let analyst = Agent( role: "Data Analyst", purpose: "Analyze processed data", context: "Expert in data analysis", tools: ["statistical_analyzer", "visualizer"])// Task 1: Ingest data (no dependencies)let ingestTask = ORTask( description: "Ingest data from source systems", expectedOutput: "Raw data loaded", agent: dataEngineer.id)// Tasks 2-3: Validate in parallel (depend on ingestion)let schemaValidation = ORTask( description: "Validate data schema compliance", expectedOutput: "Schema validation report", agent: validator.id, dependencies: [ingestTask.id])let qualityValidation = ORTask( description: "Validate data quality metrics", expectedOutput: "Quality validation report", agent: validator.id, dependencies: [ingestTask.id])// Task 4: Transform (depends on both validations)let transformTask = ORTask( description: """ Transform data for analysis. Schema check: {task_1_output} Quality check: {task_2_output} """, expectedOutput: "Transformed data", agent: dataEngineer.id, dependencies: [schemaValidation.id, qualityValidation.id], context: [schemaValidation.id, qualityValidation.id])// Task 5: Analyze (depends on transform)let analyzeTask = ORTask( description: """ Analyze transformed data. Data: {task_3_output} """, expectedOutput: "Analysis report", agent: analyst.id, dependencies: [transformTask.id], context: [transformTask.id])// Task 6: Generate report (depends on analysis)let reportTask = ORTask( description: """ Generate final report. Analysis: {task_4_output} """, expectedOutput: "Executive report", agent: analyst.id, dependencies: [analyzeTask.id], context: [analyzeTask.id])// Create task flowlet taskFlow = TaskFlow( name: "Data Processing Pipeline", tasks: [ ingestTask, schemaValidation, qualityValidation, transformTask, analyzeTask, reportTask ], stopOnFailure: true // Stop if any task fails)// Execute flowlet taskEngine = TaskExecutionEngine( agentExecutor: agentExecutor, maxConcurrentTasks: 3 // Allow parallel execution)let result = try await taskEngine.executeTaskFlow( taskFlow: taskFlow, agents: [dataEngineer, validator, analyst], context: TaskExecutionContext())print("Pipeline completed!")print("Parallel tasks executed: schemaValidation + qualityValidation")
In this example, schemaValidation and qualityValidation execute in parallel since they both only depend on ingestTask and don’t depend on each other.
// Explicit dependencieslet taskA = ORTask( description: "Task A", expectedOutput: "A", dependencies: [] // No dependencies)let taskB = ORTask( description: "Task B", expectedOutput: "B", dependencies: [taskA.id] // Depends on A)let taskC = ORTask( description: "Task C", expectedOutput: "C", dependencies: [taskA.id] // Also depends on A (parallel with B))let taskD = ORTask( description: "Task D", expectedOutput: "D", dependencies: [taskB.id, taskC.id] // Depends on both B and C)// Execution order:// 1. A executes// 2. B and C execute in parallel// 3. D executes after both B and C complete
Parallelization:
// Control max concurrent taskslet taskEngine = TaskExecutionEngine( agentExecutor: executor, maxConcurrentTasks: 5 // Up to 5 tasks in parallel)
All process types support context variable interpolation:
Task Output References
Reference previous task outputs in descriptions:
let task1 = ORTask( description: "Research AI trends", expectedOutput: "Research report")let task2 = ORTask( description: """ Write article based on research. Research data: {task_0_output} """, expectedOutput: "Article draft", context: [task1.id])
Variable Format:
{task_0_output} - First task’s output
{task_1_output} - Second task’s output
{task_N_output} - Nth task’s output
Task indices are 0-based, matching array indexing in Swift.
Orbit Inputs
Use orbit-level inputs in any task:
let task = ORTask( description: """ Analyze data for {industry} sector. Focus on {metric} metrics. Time period: {period} """, expectedOutput: "Analysis report")// Provide inputs at execution timelet inputs = OrbitInput(Metadata([ "industry": .string("healthcare"), "metric": .string("efficiency"), "period": .string("Q4 2024")]))let result = try await orbit.start(inputs: inputs)
Agent Context
Tasks automatically receive agent context:
// Agent context is available in system messagelet agent = Agent( role: "Financial Analyst", purpose: "Analyze financial data", context: """ Expert in: - Financial modeling - Risk assessment - Market analysis """)// Tasks executed by this agent have access to this context// through the system message
// Bad: Sequential when tasks are independentlet sequential = [ fetchAPI1Task, // Could run in parallel fetchAPI2Task, // Could run in parallel fetchAPI3Task, // Could run in parallel mergeTask]// Good: Use flow-based for parallelizationlet flow = TaskFlow(tasks: [ fetchAPI1Task, // Parallel fetchAPI2Task, // Parallel fetchAPI3Task, // Parallel mergeTask // Waits for all fetches])// Configure concurrent executionlet engine = TaskExecutionEngine( agentExecutor: executor, maxConcurrentTasks: 3 // Run fetches in parallel)
Agent Reuse
Reuse agents efficiently:
// Create agent pool for common taskslet workerAgents = (1...5).map { i in Agent( role: "Data Processor \(i)", purpose: "Process data efficiently", context: "Generic data processing agent", tools: ["data_processor"] )}// Hierarchical process will distribute worklet orbit = Orbit.create( agents: workerAgents, tasks: manyTasks, process: .hierarchical, manager: coordinator)
Concurrency Limits
Set appropriate concurrency limits:
// Balance parallelism with resource limitslet engine = TaskExecutionEngine( agentExecutor: executor, maxConcurrentTasks: 5, // Don't overwhelm system enableVerboseLogging: false // Reduce overhead)// Consider:// - API rate limits (50-100 requests/min)// - Memory constraints// - CPU availability// - LLM provider limits
Early Failure Detection
Configure appropriate failure handling:
// Critical path: stop on failurelet criticalFlow = TaskFlow( tasks: criticalTasks, stopOnFailure: true)// Best effort: continue on failurelet bestEffortFlow = TaskFlow( tasks: optionalTasks, stopOnFailure: false // Collect all results)// Test suites: run all testslet testFlow = TaskFlow( tasks: testTasks, stopOnFailure: false // Run all tests)
do { let result = try await orbit.start() // Process successful results for (index, output) in result.taskOutputs.enumerated() { print("Task \(index): \(output.rawOutput)") }} catch let error as OrbitAIError { // Handle specific errors switch error { case .taskExecutionFailed(let message): print("Task failed: \(message)") // Check which task failed let failedTasks = orbit.tasks.filter { $0.status == .failed } for task in failedTasks { print("Failed task: \(task.description)") } case .llmRateLimitExceeded: print("Rate limit hit - implement backoff") case .agentNotFound: print("Agent assignment issue") default: print("Other error: \(error)") }}
do { let result = try await orbit.start() // Manager may have retried tasks print("Manager's execution log:") for output in result.taskOutputs { print("Task: \(output.taskId)") print("Agent: \(output.agentId)") print("Attempts: \(output.usageMetrics.totalRequests)") }} catch { // Manager failed to complete workflow print("Manager could not complete workflow: \(error)") // Check manager's reasoning if let managerOutput = orbit.manager?.currentTask?.result?.output { print("Manager's last decision: \(managerOutput.rawOutput)") }}
do { let result = try await taskEngine.executeTaskFlow( taskFlow: taskFlow, agents: agents, context: context ) // Check which tasks succeeded let successful = result.filter { $0.isSuccess } let failed = result.filter { !$0.isSuccess } print("Successful: \(successful.count)") print("Failed: \(failed.count)") // Process partial results if stopOnFailure: false for taskResult in successful { if let output = taskResult.output { print("Completed: \(output.taskId)") } }} catch let error as OrbitAIError { if case .taskExecutionFailed(let message) = error { if message.contains("Circular dependency") { print("Fix task dependencies!") } }}
Symptoms: Manager doesn’t assign tasks to workersCauses:
Manager missing allowDelegation: true
Manager lacks delegation tools
Poor manager context/prompting
Solutions:
// Enable delegationlet manager = Agent( role: "Project Manager", purpose: "Delegate and coordinate tasks", context: """ You are a project manager coordinating a team. Your responsibilities: 1. Analyze each task's requirements 2. Select the best agent for each task 3. Assign tasks to appropriate team members 4. Monitor progress and quality Available team members and their expertise: [List agents and their capabilities] """, allowDelegation: true, // Required! tools: [ "task_analyzer", "agent_evaluator", "task_delegator" ])// Provide good worker contextlet worker = Agent( role: "Backend Developer", purpose: "Develop backend services", context: "Expert in Swift, APIs, databases", tools: ["code_generator", "api_designer"])