tee_peer() — langchain Function Reference
Architecture documentation for the tee_peer() function in aiter.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD e236a10d_7364_c481_d23c_b7df275273ea["tee_peer()"] b33566b6_8971_bdf8_5db2_dd5a13740a1f["aiter.py"] e236a10d_7364_c481_d23c_b7df275273ea -->|defined in| b33566b6_8971_bdf8_5db2_dd5a13740a1f 46690208_3f31_a6f9_9388_b2ce7bbc25ef["__init__()"] 46690208_3f31_a6f9_9388_b2ce7bbc25ef -->|calls| e236a10d_7364_c481_d23c_b7df275273ea 5e9a71fc_b238_2245_9b68_399e5d73ca3c["aclose()"] e236a10d_7364_c481_d23c_b7df275273ea -->|calls| 5e9a71fc_b238_2245_9b68_399e5d73ca3c style e236a10d_7364_c481_d23c_b7df275273ea fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/utils/aiter.py lines 103–158
async def tee_peer(
iterator: AsyncIterator[T],
# the buffer specific to this peer
buffer: deque[T],
# the buffers of all peers, including our own
peers: list[deque[T]],
lock: AbstractAsyncContextManager[Any],
) -> AsyncGenerator[T, None]:
"""An individual iterator of a `tee`.
This function is a generator that yields items from the shared iterator
`iterator`. It buffers items until the least advanced iterator has yielded them as
well.
The buffer is shared with all other peers.
Args:
iterator: The shared iterator.
buffer: The buffer for this peer.
peers: The buffers of all peers.
lock: The lock to synchronise access to the shared buffers.
Yields:
The next item from the shared iterator.
"""
try:
while True:
if not buffer:
async with lock:
# Another peer produced an item while we were waiting for the lock.
# Proceed with the next loop iteration to yield the item.
if buffer:
continue
try:
item = await anext(iterator)
except StopAsyncIteration:
break
else:
# Append to all buffers, including our own. We'll fetch our
# item from the buffer again, instead of yielding it directly.
# This ensures the proper item ordering if any of our peers
# are fetching items concurrently. They may have buffered their
# item already.
for peer_buffer in peers:
peer_buffer.append(item)
yield buffer.popleft()
finally:
async with lock:
# this peer is done - remove its buffer
for idx, peer_buffer in enumerate(peers): # pragma: no branch
if peer_buffer is buffer:
peers.pop(idx)
break
# if we are the last peer, try and close the iterator
if not peers and hasattr(iterator, "aclose"):
await iterator.aclose()
Domain
Subdomains
Defined In
Calls
Called By
Source
Frequently Asked Questions
What does tee_peer() do?
tee_peer() is a function in the langchain codebase, defined in libs/core/langchain_core/utils/aiter.py.
Where is tee_peer() defined?
tee_peer() is defined in libs/core/langchain_core/utils/aiter.py at line 103.
What does tee_peer() call?
tee_peer() calls 1 function(s): aclose.
What calls tee_peer()?
tee_peer() is called by 1 function(s): __init__.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free