test_async_wait_max_bucket_size() — langchain Function Reference
Architecture documentation for the test_async_wait_max_bucket_size() function in test_in_memory_rate_limiter.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 562476b3_4f2f_695b_9fde_19460fe61cb2["test_async_wait_max_bucket_size()"] a2ad4c1d_ffde_beed_c251_817f2cc8edf6["test_in_memory_rate_limiter.py"] 562476b3_4f2f_695b_9fde_19460fe61cb2 -->|defined in| a2ad4c1d_ffde_beed_c251_817f2cc8edf6 style 562476b3_4f2f_695b_9fde_19460fe61cb2 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/rate_limiters/test_in_memory_rate_limiter.py lines 94–110
async def test_async_wait_max_bucket_size() -> None:
with freeze_time("2023-01-01 00:00:00") as frozen_time:
rate_limiter = InMemoryRateLimiter(
requests_per_second=2, check_every_n_seconds=0.1, max_bucket_size=500
)
rate_limiter.last = time.time()
frozen_time.tick(100) # Increment by 100 seconds
assert await rate_limiter.aacquire(blocking=False)
# After 100 seconds we manage to refill the bucket with 200 tokens
# After consuming 1 token, we should have 199 tokens left
assert rate_limiter.available_tokens == 199.0
frozen_time.tick(10000)
assert await rate_limiter.aacquire(blocking=False)
assert rate_limiter.available_tokens == 499.0
# Assert that sync wait can proceed without blocking
# since we have enough tokens
await rate_limiter.aacquire(blocking=True)
Domain
Subdomains
Source
Frequently Asked Questions
What does test_async_wait_max_bucket_size() do?
test_async_wait_max_bucket_size() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/rate_limiters/test_in_memory_rate_limiter.py.
Where is test_async_wait_max_bucket_size() defined?
test_async_wait_max_bucket_size() is defined in libs/core/tests/unit_tests/rate_limiters/test_in_memory_rate_limiter.py at line 94.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free