Home / File/ test_runnable_events_v1.py — langchain Source File

test_runnable_events_v1.py — langchain Source File

Architecture documentation for test_runnable_events_v1.py, a python file in the langchain codebase. 21 imports, 0 dependents.

File python CoreAbstractions RunnableInterface 21 imports 23 functions 2 classes

Entity Profile

Dependency Diagram

graph LR
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02["test_runnable_events_v1.py"]
  a327e534_84f6_5308_58ca_5727d5eda0cf["asyncio"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> a327e534_84f6_5308_58ca_5727d5eda0cf
  d76a28c2_c3ab_00a8_5208_77807a49449d["sys"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> d76a28c2_c3ab_00a8_5208_77807a49449d
  cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7["collections.abc"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7
  436f77bc_653d_0edb_555c_c2679d5a59ac["itertools"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> 436f77bc_653d_0edb_555c_c2679d5a59ac
  8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3["typing"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3
  120e2591_3e15_b895_72b6_cb26195e40a6["pytest"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> 120e2591_3e15_b895_72b6_cb26195e40a6
  6e58aaea_f08e_c099_3cc7_f9567bfb1ae7["pydantic"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> 6e58aaea_f08e_c099_3cc7_f9567bfb1ae7
  91721f45_4909_e489_8c1f_084f8bd87145["typing_extensions"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> 91721f45_4909_e489_8c1f_084f8bd87145
  f3bc7443_c889_119d_0744_aacc3620d8d2["langchain_core.callbacks"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> f3bc7443_c889_119d_0744_aacc3620d8d2
  38c69580_3a92_51b1_7512_b0ccca9a0ef6["langchain_core.chat_history"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> 38c69580_3a92_51b1_7512_b0ccca9a0ef6
  c554676d_b731_47b2_a98f_c1c2d537c0aa["langchain_core.documents"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> c554676d_b731_47b2_a98f_c1c2d537c0aa
  ba43b74d_3099_7e1c_aac3_cf594720469e["langchain_core.language_models"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> ba43b74d_3099_7e1c_aac3_cf594720469e
  d758344f_537f_649e_f467_b9d7442e86df["langchain_core.messages"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> d758344f_537f_649e_f467_b9d7442e86df
  5b417886_56dd_6afa_13ab_a3cfc1dbcccd["langchain_core.prompt_values"]
  57403c39_0b3d_acaa_2fb1_22c0b48dbc02 --> 5b417886_56dd_6afa_13ab_a3cfc1dbcccd
  style 57403c39_0b3d_acaa_2fb1_22c0b48dbc02 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

"""Module that contains tests for runnable.astream_events API."""

import asyncio
import sys
from collections.abc import AsyncIterator, Mapping, Sequence
from itertools import cycle
from typing import Any, cast

import pytest
from pydantic import BaseModel
from typing_extensions import override

from langchain_core.callbacks import CallbackManagerForRetrieverRun, Callbacks
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.documents import Document
from langchain_core.language_models import FakeStreamingListLLM, GenericFakeChatModel
from langchain_core.messages import (
    AIMessage,
    AIMessageChunk,
    BaseMessage,
    HumanMessage,
    SystemMessage,
)
from langchain_core.prompt_values import ChatPromptValue
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import (
    ConfigurableField,
    RunnableConfig,
    RunnableLambda,
)
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.runnables.schema import StreamEvent
from langchain_core.tools import tool
from tests.unit_tests.stubs import _any_id_ai_message, _any_id_ai_message_chunk


def _with_nulled_run_id(events: Sequence[StreamEvent]) -> list[StreamEvent]:
    """Removes the run IDs from events."""
    for event in events:
        assert "parent_ids" in event, "Parent IDs should be present in the event."
        assert event["parent_ids"] == [], "Parent IDs should be empty."

    return cast("list[StreamEvent]", [{**event, "run_id": ""} for event in events])


async def _collect_events(events: AsyncIterator[StreamEvent]) -> list[StreamEvent]:
    """Collect the events and remove the run ids."""
    materialized_events = [event async for event in events]
    events_ = _with_nulled_run_id(materialized_events)
    for event in events_:
        event["tags"] = sorted(event["tags"])
    return events_


def _assert_events_equal_allow_superset_metadata(
    events: Sequence[Mapping[str, Any]], expected: Sequence[Mapping[str, Any]]
) -> None:
    """Assert that the events are equal."""
    assert len(events) == len(expected)
// ... (2100 more lines)

Subdomains

Dependencies

  • asyncio
  • collections.abc
  • itertools
  • langchain_core.callbacks
  • langchain_core.chat_history
  • langchain_core.documents
  • langchain_core.language_models
  • langchain_core.messages
  • langchain_core.prompt_values
  • langchain_core.prompts
  • langchain_core.retrievers
  • langchain_core.runnables
  • langchain_core.runnables.history
  • langchain_core.runnables.schema
  • langchain_core.tools
  • pydantic
  • pytest
  • sys
  • tests.unit_tests.stubs
  • typing
  • typing_extensions

Frequently Asked Questions

What does test_runnable_events_v1.py do?
test_runnable_events_v1.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, RunnableInterface subdomain.
What functions are defined in test_runnable_events_v1.py?
test_runnable_events_v1.py defines 23 function(s): _assert_events_equal_allow_superset_metadata, _collect_events, _with_nulled_run_id, test_astream_events_from_model, test_async_in_async_stream_lambdas, test_chain_ordering, test_event_stream_on_chain_with_tool, test_event_stream_with_lambdas_from_lambda, test_event_stream_with_retriever, test_event_stream_with_retriever_and_formatter, and 13 more.
What does test_runnable_events_v1.py depend on?
test_runnable_events_v1.py imports 21 module(s): asyncio, collections.abc, itertools, langchain_core.callbacks, langchain_core.chat_history, langchain_core.documents, langchain_core.language_models, langchain_core.messages, and 13 more.
Where is test_runnable_events_v1.py in the architecture?
test_runnable_events_v1.py is located at libs/core/tests/unit_tests/runnables/test_runnable_events_v1.py (domain: CoreAbstractions, subdomain: RunnableInterface, directory: libs/core/tests/unit_tests/runnables).

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free