Crowe Concurrent Execution API
Concurrent Agents API Reference
This API enables running multiple Crowe agents concurrently using optimized execution strategies. It leverages asyncio (enhanced by uvloop
) for high-performance I/O and ThreadPoolExecutor for CPU-bound workloads.
Contents
Core Functions
Advanced Functions
Resource Monitoring
Usage Examples
Performance & Error Handling
Core Functions
run_agents_concurrently()
run_agents_concurrently()
Runs multiple agents in parallel with automatic batching and performance optimization.
Uses uvloop
for async event loops and ThreadPoolExecutor
for CPU-heavy tasks.
agents
List[Agent]
✅
—
Agents to execute
task
str
✅
—
Task string to run
batch_size
int
❌
CPU count
Agents per batch
max_workers
int
❌
CPU count × 2
Thread pool size
Returns:
List[Any]
– Outputs from all agents in execution order.
run_agents_sequentially()
run_agents_sequentially()
Executes agents one by one (baseline or debugging).
agents
List[Agent]
✅
—
Agents to run
task
str
✅
—
Task string
Returns:
List[Any]
– Outputs in sequential execution order.
Advanced Functions
run_agents_with_different_tasks()
run_agents_with_different_tasks()
Executes multiple (agent, task)
pairs concurrently.
agent_task_pairs
List[Tuple[Agent, str]]
✅
—
(Agent, Task) tuples
batch_size
int
❌
CPU count
Batch size
max_workers
int
❌
CPU count × 2
Thread pool size
run_agents_with_timeout()
run_agents_with_timeout()
Adds per-agent execution time limits.
agents
List[Agent]
✅
—
Agents to run
task
str
✅
—
Task string
timeout
float
✅
—
Timeout in seconds
batch_size
int
❌
CPU count
Batch size
max_workers
int
❌
CPU count × 2
Thread pool size
Resource Monitoring
ResourceMetrics
(dataclass)
ResourceMetrics
(dataclass)cpu_percent
float
Current CPU usage
memory_percent
float
Current RAM usage
active_threads
int
Number of active threads
run_agents_with_resource_monitoring()
run_agents_with_resource_monitoring()
Executes agents while tracking CPU/memory usage and dynamically adjusting batch sizes.
agents
List[Agent]
✅
—
Agents to run
task
str
✅
—
Task string
cpu_threshold
float
❌
90.0
Max CPU %
memory_threshold
float
❌
90.0
Max RAM %
check_interval
float
❌
1.0
Monitor interval (s)
Usage Examples
from crowe.structs.agent import Agent
from crowe.structs.multi_agent_exec import (
run_agents_concurrently,
run_agents_with_timeout,
run_agents_with_different_tasks
)
# Initialize agents
agents = [
Agent(
agent_name=f"Analysis-Agent-{i}",
system_prompt="You are a financial analysis expert",
model_name="gpt-4o-mini",
max_loops=1
)
for i in range(5)
]
# 1️⃣ Run concurrently
task = "Analyze the impact of rising interest rates on tech stocks"
outputs = run_agents_concurrently(agents, task)
# 2️⃣ Run with timeout
outputs_with_timeout = run_agents_with_timeout(
agents=agents,
task=task,
timeout=30.0,
batch_size=2
)
# 3️⃣ Run with different tasks
task_pairs = [
(agents[0], "Analyze tech stocks"),
(agents[1], "Analyze energy stocks"),
(agents[2], "Analyze retail stocks")
]
different_outputs = run_agents_with_different_tasks(task_pairs)
Performance Tips
Batch Size: Keep within CPU core count for best efficiency.
Resource Monitoring: Prevents overload by dynamically reducing concurrency.
uvloop: Speeds up async execution vs standard asyncio.
Error Handling
Automatic event loop management (
asyncio.get_event_loop()
safe calls)Timeout enforcement prevents stalls
Resource monitoring ensures safe scaling
Last updated