test_concurrency.py — langchain Source File
Architecture documentation for test_concurrency.py, a python file in the langchain codebase. 6 imports, 0 dependents.
Entity Profile
Dependency Diagram
graph LR cdca1c1e_cae3_568d_eb04_9cb589446459["test_concurrency.py"] a327e534_84f6_5308_58ca_5727d5eda0cf["asyncio"] cdca1c1e_cae3_568d_eb04_9cb589446459 --> a327e534_84f6_5308_58ca_5727d5eda0cf 0c1d9a1b_c553_0388_dbc1_58af49567aa2["time"] cdca1c1e_cae3_568d_eb04_9cb589446459 --> 0c1d9a1b_c553_0388_dbc1_58af49567aa2 242d0b7d_a8ef_b66d_169b_c791b32a9cc9["threading"] cdca1c1e_cae3_568d_eb04_9cb589446459 --> 242d0b7d_a8ef_b66d_169b_c791b32a9cc9 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3["typing"] cdca1c1e_cae3_568d_eb04_9cb589446459 --> 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3 120e2591_3e15_b895_72b6_cb26195e40a6["pytest"] cdca1c1e_cae3_568d_eb04_9cb589446459 --> 120e2591_3e15_b895_72b6_cb26195e40a6 2ceb1686_0f8c_8ae0_36d1_7c0b702fda1c["langchain_core.runnables"] cdca1c1e_cae3_568d_eb04_9cb589446459 --> 2ceb1686_0f8c_8ae0_36d1_7c0b702fda1c style cdca1c1e_cae3_568d_eb04_9cb589446459 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
"""Test concurrency behavior of batch and async batch operations."""
import asyncio
import time
from threading import Lock
from typing import Any
import pytest
from langchain_core.runnables import RunnableConfig, RunnableLambda
@pytest.mark.asyncio
async def test_abatch_concurrency() -> None:
"""Test that abatch respects max_concurrency."""
running_tasks = 0
max_running_tasks = 0
lock = asyncio.Lock()
async def tracked_function(x: Any) -> str:
nonlocal running_tasks, max_running_tasks
async with lock:
running_tasks += 1
max_running_tasks = max(max_running_tasks, running_tasks)
await asyncio.sleep(0.1) # Simulate work
async with lock:
running_tasks -= 1
return f"Completed {x}"
runnable = RunnableLambda(tracked_function)
num_tasks = 10
max_concurrency = 3
config = RunnableConfig(max_concurrency=max_concurrency)
results = await runnable.abatch(list(range(num_tasks)), config=config)
assert len(results) == num_tasks
assert max_running_tasks <= max_concurrency
@pytest.mark.asyncio
async def test_abatch_as_completed_concurrency() -> None:
"""Test that abatch_as_completed respects max_concurrency."""
running_tasks = 0
max_running_tasks = 0
lock = asyncio.Lock()
async def tracked_function(x: Any) -> str:
nonlocal running_tasks, max_running_tasks
async with lock:
running_tasks += 1
max_running_tasks = max(max_running_tasks, running_tasks)
await asyncio.sleep(0.1) # Simulate work
async with lock:
running_tasks -= 1
// ... (83 more lines)
Domain
Subdomains
Functions
Dependencies
- asyncio
- langchain_core.runnables
- pytest
- threading
- time
- typing
Source
Frequently Asked Questions
What does test_concurrency.py do?
test_concurrency.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, Serialization subdomain.
What functions are defined in test_concurrency.py?
test_concurrency.py defines 4 function(s): test_abatch_as_completed_concurrency, test_abatch_concurrency, test_batch_as_completed_concurrency, test_batch_concurrency.
What does test_concurrency.py depend on?
test_concurrency.py imports 6 module(s): asyncio, langchain_core.runnables, pytest, threading, time, typing.
Where is test_concurrency.py in the architecture?
test_concurrency.py is located at libs/core/tests/unit_tests/runnables/test_concurrency.py (domain: CoreAbstractions, subdomain: Serialization, directory: libs/core/tests/unit_tests/runnables).
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free