Home / Class/ StreamingRunnable Class — langchain Architecture

StreamingRunnable Class — langchain Architecture

Architecture documentation for the StreamingRunnable class in test_runnable_events_v2.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  ec849ced_49cd_d2f5_60e0_db0d12879deb["StreamingRunnable"]
  33c02978_2077_5819_7048_bc2a81e80625["test_runnable_events_v2.py"]
  ec849ced_49cd_d2f5_60e0_db0d12879deb -->|defined in| 33c02978_2077_5819_7048_bc2a81e80625
  40332084_b761_5fbb_75ef_6d1b79951608["__init__()"]
  ec849ced_49cd_d2f5_60e0_db0d12879deb -->|method| 40332084_b761_5fbb_75ef_6d1b79951608
  44779688_f3fe_6e36_2d50_baa2a78e6a2d["invoke()"]
  ec849ced_49cd_d2f5_60e0_db0d12879deb -->|method| 44779688_f3fe_6e36_2d50_baa2a78e6a2d
  96cda70a_ff2d_eca4_0fa9_2ffa9cde3432["stream()"]
  ec849ced_49cd_d2f5_60e0_db0d12879deb -->|method| 96cda70a_ff2d_eca4_0fa9_2ffa9cde3432
  858b85ba_6d9e_d209_9eb5_2469bad0adca["astream()"]
  ec849ced_49cd_d2f5_60e0_db0d12879deb -->|method| 858b85ba_6d9e_d209_9eb5_2469bad0adca

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py lines 2121–2182

class StreamingRunnable(Runnable[Any, Addable]):
    """A custom runnable used for testing purposes."""

    iterable: Iterable[Addable]

    def __init__(self, iterable: Iterable[Addable]) -> None:
        """Initialize the runnable."""
        self.iterable = iterable

    @override
    def invoke(
        self, input: Any, config: RunnableConfig | None = None, **kwargs: Any
    ) -> Addable:
        """Invoke the runnable."""
        msg = "Server side error"
        raise ValueError(msg)

    @override
    def stream(
        self,
        input: Any,
        config: RunnableConfig | None = None,
        **kwargs: Any | None,
    ) -> Iterator[Addable]:
        raise NotImplementedError

    @override
    async def astream(
        self,
        input: Any,
        config: RunnableConfig | None = None,
        **kwargs: Any | None,
    ) -> AsyncIterator[Addable]:
        config = ensure_config(config)
        callback_manager = get_async_callback_manager_for_config(config)
        run_manager = await callback_manager.on_chain_start(
            None,
            input,
            name=config.get("run_name", self.get_name()),
            run_id=config.get("run_id"),
        )

        try:
            final_output = None
            for element in self.iterable:
                if isinstance(element, BaseException):
                    raise element  # noqa: TRY301
                yield element

                if final_output is None:
                    final_output = element
                else:
                    try:
                        final_output = final_output + element
                    except TypeError:
                        final_output = element

            # set final channel values as run output
            await run_manager.on_chain_end(final_output)
        except BaseException as e:
            await run_manager.on_chain_error(e)
            raise

Frequently Asked Questions

What is the StreamingRunnable class?
StreamingRunnable is a class in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py.
Where is StreamingRunnable defined?
StreamingRunnable is defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py at line 2121.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free