Crowe Concurrent Execution API

Concurrent Agents API Reference

This API enables running multiple Crowe agents concurrently using optimized execution strategies. It leverages asyncio (enhanced by uvloop) for high-performance I/O and ThreadPoolExecutor for CPU-bound workloads.


Contents

  • Core Functions

  • Advanced Functions

  • Resource Monitoring

  • Usage Examples

  • Performance & Error Handling


Core Functions

run_agents_concurrently()

Runs multiple agents in parallel with automatic batching and performance optimization. Uses uvloop for async event loops and ThreadPoolExecutor for CPU-heavy tasks.

Parameter
Type
Required
Default
Description

agents

List[Agent]

Agents to execute

task

str

Task string to run

batch_size

int

CPU count

Agents per batch

max_workers

int

CPU count × 2

Thread pool size

Returns: List[Any] – Outputs from all agents in execution order.


run_agents_sequentially()

Executes agents one by one (baseline or debugging).

Parameter
Type
Required
Default
Description

agents

List[Agent]

Agents to run

task

str

Task string

Returns: List[Any] – Outputs in sequential execution order.


Advanced Functions

run_agents_with_different_tasks()

Executes multiple (agent, task) pairs concurrently.

Parameter
Type
Required
Default
Description

agent_task_pairs

List[Tuple[Agent, str]]

(Agent, Task) tuples

batch_size

int

CPU count

Batch size

max_workers

int

CPU count × 2

Thread pool size


run_agents_with_timeout()

Adds per-agent execution time limits.

Parameter
Type
Required
Default
Description

agents

List[Agent]

Agents to run

task

str

Task string

timeout

float

Timeout in seconds

batch_size

int

CPU count

Batch size

max_workers

int

CPU count × 2

Thread pool size


Resource Monitoring

ResourceMetrics (dataclass)

Property
Type
Description

cpu_percent

float

Current CPU usage

memory_percent

float

Current RAM usage

active_threads

int

Number of active threads


run_agents_with_resource_monitoring()

Executes agents while tracking CPU/memory usage and dynamically adjusting batch sizes.

Parameter
Type
Required
Default
Description

agents

List[Agent]

Agents to run

task

str

Task string

cpu_threshold

float

90.0

Max CPU %

memory_threshold

float

90.0

Max RAM %

check_interval

float

1.0

Monitor interval (s)


Usage Examples

from crowe.structs.agent import Agent
from crowe.structs.multi_agent_exec import (
    run_agents_concurrently,
    run_agents_with_timeout,
    run_agents_with_different_tasks
)

# Initialize agents
agents = [
    Agent(
        agent_name=f"Analysis-Agent-{i}",
        system_prompt="You are a financial analysis expert",
        model_name="gpt-4o-mini",
        max_loops=1
    )
    for i in range(5)
]

# 1️⃣ Run concurrently
task = "Analyze the impact of rising interest rates on tech stocks"
outputs = run_agents_concurrently(agents, task)

# 2️⃣ Run with timeout
outputs_with_timeout = run_agents_with_timeout(
    agents=agents,
    task=task,
    timeout=30.0,
    batch_size=2
)

# 3️⃣ Run with different tasks
task_pairs = [
    (agents[0], "Analyze tech stocks"),
    (agents[1], "Analyze energy stocks"),
    (agents[2], "Analyze retail stocks")
]
different_outputs = run_agents_with_different_tasks(task_pairs)

Performance Tips

  • Batch Size: Keep within CPU core count for best efficiency.

  • Resource Monitoring: Prevents overload by dynamically reducing concurrency.

  • uvloop: Speeds up async execution vs standard asyncio.


Error Handling

  • Automatic event loop management (asyncio.get_event_loop() safe calls)

  • Timeout enforcement prevents stalls

  • Resource monitoring ensures safe scaling

Last updated