test_batch_as_completed_concurrency() — langchain Function Reference
Architecture documentation for the test_batch_as_completed_concurrency() function in test_concurrency.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD f2bc5cc0_8a23_d75a_9955_d96952677d42["test_batch_as_completed_concurrency()"] 4a52821c_3a01_e8ab_b3f9_fe358ca00824["test_concurrency.py"] f2bc5cc0_8a23_d75a_9955_d96952677d42 -->|defined in| 4a52821c_3a01_e8ab_b3f9_fe358ca00824 style f2bc5cc0_8a23_d75a_9955_d96952677d42 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/runnables/test_concurrency.py lines 110–142
def test_batch_as_completed_concurrency() -> None:
"""Test that batch_as_completed respects max_concurrency."""
running_tasks = 0
max_running_tasks = 0
lock = Lock()
def tracked_function(x: Any) -> str:
nonlocal running_tasks, max_running_tasks
with lock:
running_tasks += 1
max_running_tasks = max(max_running_tasks, running_tasks)
time.sleep(0.1) # Simulate work
with lock:
running_tasks -= 1
return f"Completed {x}"
runnable = RunnableLambda(tracked_function)
num_tasks = 10
max_concurrency = 3
config = RunnableConfig(max_concurrency=max_concurrency)
results = []
for _idx, result in runnable.batch_as_completed(
list(range(num_tasks)), config=config
):
results.append(result)
assert len(results) == num_tasks
assert max_running_tasks <= max_concurrency
Domain
Subdomains
Source
Frequently Asked Questions
What does test_batch_as_completed_concurrency() do?
test_batch_as_completed_concurrency() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_concurrency.py.
Where is test_batch_as_completed_concurrency() defined?
test_batch_as_completed_concurrency() is defined in libs/core/tests/unit_tests/runnables/test_concurrency.py at line 110.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free