test_abatch_as_completed_concurrency() — langchain Function Reference
Architecture documentation for the test_abatch_as_completed_concurrency() function in test_concurrency.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 354deeff_067f_9721_676d_8e6d093610bc["test_abatch_as_completed_concurrency()"] 4a52821c_3a01_e8ab_b3f9_fe358ca00824["test_concurrency.py"] 354deeff_067f_9721_676d_8e6d093610bc -->|defined in| 4a52821c_3a01_e8ab_b3f9_fe358ca00824 style 354deeff_067f_9721_676d_8e6d093610bc fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/runnables/test_concurrency.py lines 45–76
async def test_abatch_as_completed_concurrency() -> None:
"""Test that abatch_as_completed respects max_concurrency."""
running_tasks = 0
max_running_tasks = 0
lock = asyncio.Lock()
async def tracked_function(x: Any) -> str:
nonlocal running_tasks, max_running_tasks
async with lock:
running_tasks += 1
max_running_tasks = max(max_running_tasks, running_tasks)
await asyncio.sleep(0.1) # Simulate work
async with lock:
running_tasks -= 1
return f"Completed {x}"
runnable = RunnableLambda(tracked_function)
num_tasks = 10
max_concurrency = 3
config = RunnableConfig(max_concurrency=max_concurrency)
results = []
async for _idx, result in runnable.abatch_as_completed(
list(range(num_tasks)), config=config
):
results.append(result)
assert len(results) == num_tasks
assert max_running_tasks <= max_concurrency
Domain
Subdomains
Source
Frequently Asked Questions
What does test_abatch_as_completed_concurrency() do?
test_abatch_as_completed_concurrency() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_concurrency.py.
Where is test_abatch_as_completed_concurrency() defined?
test_abatch_as_completed_concurrency() is defined in libs/core/tests/unit_tests/runnables/test_concurrency.py at line 45.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free