Home / File/ test_concurrency.py — langchain Source File

test_concurrency.py — langchain Source File

Architecture documentation for test_concurrency.py, a python file in the langchain codebase. 6 imports, 0 dependents.

File python CoreAbstractions Serialization 6 imports 4 functions

Entity Profile

Dependency Diagram

graph LR
  cdca1c1e_cae3_568d_eb04_9cb589446459["test_concurrency.py"]
  a327e534_84f6_5308_58ca_5727d5eda0cf["asyncio"]
  cdca1c1e_cae3_568d_eb04_9cb589446459 --> a327e534_84f6_5308_58ca_5727d5eda0cf
  0c1d9a1b_c553_0388_dbc1_58af49567aa2["time"]
  cdca1c1e_cae3_568d_eb04_9cb589446459 --> 0c1d9a1b_c553_0388_dbc1_58af49567aa2
  242d0b7d_a8ef_b66d_169b_c791b32a9cc9["threading"]
  cdca1c1e_cae3_568d_eb04_9cb589446459 --> 242d0b7d_a8ef_b66d_169b_c791b32a9cc9
  8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3["typing"]
  cdca1c1e_cae3_568d_eb04_9cb589446459 --> 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3
  120e2591_3e15_b895_72b6_cb26195e40a6["pytest"]
  cdca1c1e_cae3_568d_eb04_9cb589446459 --> 120e2591_3e15_b895_72b6_cb26195e40a6
  2ceb1686_0f8c_8ae0_36d1_7c0b702fda1c["langchain_core.runnables"]
  cdca1c1e_cae3_568d_eb04_9cb589446459 --> 2ceb1686_0f8c_8ae0_36d1_7c0b702fda1c
  style cdca1c1e_cae3_568d_eb04_9cb589446459 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

"""Test concurrency behavior of batch and async batch operations."""

import asyncio
import time
from threading import Lock
from typing import Any

import pytest

from langchain_core.runnables import RunnableConfig, RunnableLambda


@pytest.mark.asyncio
async def test_abatch_concurrency() -> None:
    """Test that abatch respects max_concurrency."""
    running_tasks = 0
    max_running_tasks = 0
    lock = asyncio.Lock()

    async def tracked_function(x: Any) -> str:
        nonlocal running_tasks, max_running_tasks
        async with lock:
            running_tasks += 1
            max_running_tasks = max(max_running_tasks, running_tasks)

        await asyncio.sleep(0.1)  # Simulate work

        async with lock:
            running_tasks -= 1

        return f"Completed {x}"

    runnable = RunnableLambda(tracked_function)
    num_tasks = 10
    max_concurrency = 3

    config = RunnableConfig(max_concurrency=max_concurrency)
    results = await runnable.abatch(list(range(num_tasks)), config=config)

    assert len(results) == num_tasks
    assert max_running_tasks <= max_concurrency


@pytest.mark.asyncio
async def test_abatch_as_completed_concurrency() -> None:
    """Test that abatch_as_completed respects max_concurrency."""
    running_tasks = 0
    max_running_tasks = 0
    lock = asyncio.Lock()

    async def tracked_function(x: Any) -> str:
        nonlocal running_tasks, max_running_tasks
        async with lock:
            running_tasks += 1
            max_running_tasks = max(max_running_tasks, running_tasks)

        await asyncio.sleep(0.1)  # Simulate work

        async with lock:
            running_tasks -= 1
// ... (83 more lines)

Subdomains

Dependencies

  • asyncio
  • langchain_core.runnables
  • pytest
  • threading
  • time
  • typing

Frequently Asked Questions

What does test_concurrency.py do?
test_concurrency.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, Serialization subdomain.
What functions are defined in test_concurrency.py?
test_concurrency.py defines 4 function(s): test_abatch_as_completed_concurrency, test_abatch_concurrency, test_batch_as_completed_concurrency, test_batch_concurrency.
What does test_concurrency.py depend on?
test_concurrency.py imports 6 module(s): asyncio, langchain_core.runnables, pytest, threading, time, typing.
Where is test_concurrency.py in the architecture?
test_concurrency.py is located at libs/core/tests/unit_tests/runnables/test_concurrency.py (domain: CoreAbstractions, subdomain: Serialization, directory: libs/core/tests/unit_tests/runnables).

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free