Home / Function/ test_abatch_as_completed_concurrency() — langchain Function Reference

test_abatch_as_completed_concurrency() — langchain Function Reference

Architecture documentation for the test_abatch_as_completed_concurrency() function in test_concurrency.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  354deeff_067f_9721_676d_8e6d093610bc["test_abatch_as_completed_concurrency()"]
  4a52821c_3a01_e8ab_b3f9_fe358ca00824["test_concurrency.py"]
  354deeff_067f_9721_676d_8e6d093610bc -->|defined in| 4a52821c_3a01_e8ab_b3f9_fe358ca00824
  style 354deeff_067f_9721_676d_8e6d093610bc fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_concurrency.py lines 45–76

async def test_abatch_as_completed_concurrency() -> None:
    """Test that abatch_as_completed respects max_concurrency."""
    running_tasks = 0
    max_running_tasks = 0
    lock = asyncio.Lock()

    async def tracked_function(x: Any) -> str:
        nonlocal running_tasks, max_running_tasks
        async with lock:
            running_tasks += 1
            max_running_tasks = max(max_running_tasks, running_tasks)

        await asyncio.sleep(0.1)  # Simulate work

        async with lock:
            running_tasks -= 1

        return f"Completed {x}"

    runnable = RunnableLambda(tracked_function)
    num_tasks = 10
    max_concurrency = 3

    config = RunnableConfig(max_concurrency=max_concurrency)
    results = []
    async for _idx, result in runnable.abatch_as_completed(
        list(range(num_tasks)), config=config
    ):
        results.append(result)

    assert len(results) == num_tasks
    assert max_running_tasks <= max_concurrency

Domain

Subdomains

Frequently Asked Questions

What does test_abatch_as_completed_concurrency() do?
test_abatch_as_completed_concurrency() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_concurrency.py.
Where is test_abatch_as_completed_concurrency() defined?
test_abatch_as_completed_concurrency() is defined in libs/core/tests/unit_tests/runnables/test_concurrency.py at line 45.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free