Home / Function/ tee_peer() — langchain Function Reference

tee_peer() — langchain Function Reference

Architecture documentation for the tee_peer() function in aiter.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  e236a10d_7364_c481_d23c_b7df275273ea["tee_peer()"]
  b33566b6_8971_bdf8_5db2_dd5a13740a1f["aiter.py"]
  e236a10d_7364_c481_d23c_b7df275273ea -->|defined in| b33566b6_8971_bdf8_5db2_dd5a13740a1f
  46690208_3f31_a6f9_9388_b2ce7bbc25ef["__init__()"]
  46690208_3f31_a6f9_9388_b2ce7bbc25ef -->|calls| e236a10d_7364_c481_d23c_b7df275273ea
  5e9a71fc_b238_2245_9b68_399e5d73ca3c["aclose()"]
  e236a10d_7364_c481_d23c_b7df275273ea -->|calls| 5e9a71fc_b238_2245_9b68_399e5d73ca3c
  style e236a10d_7364_c481_d23c_b7df275273ea fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/utils/aiter.py lines 103–158

async def tee_peer(
    iterator: AsyncIterator[T],
    # the buffer specific to this peer
    buffer: deque[T],
    # the buffers of all peers, including our own
    peers: list[deque[T]],
    lock: AbstractAsyncContextManager[Any],
) -> AsyncGenerator[T, None]:
    """An individual iterator of a `tee`.

    This function is a generator that yields items from the shared iterator
    `iterator`. It buffers items until the least advanced iterator has yielded them as
    well.

    The buffer is shared with all other peers.

    Args:
        iterator: The shared iterator.
        buffer: The buffer for this peer.
        peers: The buffers of all peers.
        lock: The lock to synchronise access to the shared buffers.

    Yields:
        The next item from the shared iterator.
    """
    try:
        while True:
            if not buffer:
                async with lock:
                    # Another peer produced an item while we were waiting for the lock.
                    # Proceed with the next loop iteration to yield the item.
                    if buffer:
                        continue
                    try:
                        item = await anext(iterator)
                    except StopAsyncIteration:
                        break
                    else:
                        # Append to all buffers, including our own. We'll fetch our
                        # item from the buffer again, instead of yielding it directly.
                        # This ensures the proper item ordering if any of our peers
                        # are fetching items concurrently. They may have buffered their
                        # item already.
                        for peer_buffer in peers:
                            peer_buffer.append(item)
            yield buffer.popleft()
    finally:
        async with lock:
            # this peer is done - remove its buffer
            for idx, peer_buffer in enumerate(peers):  # pragma: no branch
                if peer_buffer is buffer:
                    peers.pop(idx)
                    break
            # if we are the last peer, try and close the iterator
            if not peers and hasattr(iterator, "aclose"):
                await iterator.aclose()

Subdomains

Calls

Called By

Frequently Asked Questions

What does tee_peer() do?
tee_peer() is a function in the langchain codebase, defined in libs/core/langchain_core/utils/aiter.py.
Where is tee_peer() defined?
tee_peer() is defined in libs/core/langchain_core/utils/aiter.py at line 103.
What does tee_peer() call?
tee_peer() calls 1 function(s): aclose.
What calls tee_peer()?
tee_peer() is called by 1 function(s): __init__.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free