Home / Function/ stream() — langchain Function Reference

stream() — langchain Function Reference

Architecture documentation for the stream() function in fallbacks.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  290fd3ea_cdd4_01fe_6732_5932fe9a991a["stream()"]
  da856a55_079e_fd91_47d6_928251e6cee3["RunnableWithFallbacks"]
  290fd3ea_cdd4_01fe_6732_5932fe9a991a -->|defined in| da856a55_079e_fd91_47d6_928251e6cee3
  style 290fd3ea_cdd4_01fe_6732_5932fe9a991a fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/fallbacks.py lines 466–527

    def stream(
        self,
        input: Input,
        config: RunnableConfig | None = None,
        **kwargs: Any | None,
    ) -> Iterator[Output]:
        if self.exception_key is not None and not isinstance(input, dict):
            msg = (
                "If 'exception_key' is specified then input must be a dictionary."
                f"However found a type of {type(input)} for input"
            )
            raise ValueError(msg)
        # setup callbacks
        config = ensure_config(config)
        callback_manager = get_callback_manager_for_config(config)
        # start the root run
        run_manager = callback_manager.on_chain_start(
            None,
            input,
            name=config.get("run_name") or self.get_name(),
            run_id=config.pop("run_id", None),
        )
        first_error = None
        last_error = None
        for runnable in self.runnables:
            try:
                if self.exception_key and last_error is not None:
                    input[self.exception_key] = last_error  # type: ignore[index]
                child_config = patch_config(config, callbacks=run_manager.get_child())
                with set_config_context(child_config) as context:
                    stream = context.run(
                        runnable.stream,
                        input,
                        **kwargs,
                    )
                    chunk: Output = context.run(next, stream)
            except self.exceptions_to_handle as e:
                first_error = e if first_error is None else first_error
                last_error = e
            except BaseException as e:
                run_manager.on_chain_error(e)
                raise
            else:
                first_error = None
                break
        if first_error:
            run_manager.on_chain_error(first_error)
            raise first_error

        yield chunk
        output: Output | None = chunk
        try:
            for chunk in stream:
                yield chunk
                try:
                    output = output + chunk  # type: ignore[operator]
                except TypeError:
                    output = None
        except BaseException as e:
            run_manager.on_chain_error(e)
            raise
        run_manager.on_chain_end(output)

Domain

Subdomains

Frequently Asked Questions

What does stream() do?
stream() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/fallbacks.py.
Where is stream() defined?
stream() is defined in libs/core/langchain_core/runnables/fallbacks.py at line 466.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free