Home / Function/ _transform_stream_with_config() — langchain Function Reference

_transform_stream_with_config() — langchain Function Reference

Architecture documentation for the _transform_stream_with_config() function in base.py from the langchain codebase.

Function python LangChainCore Runnables calls 2 called by 4

Entity Profile

Dependency Diagram

graph TD
  3b190320_b896_da9c_12ee_4d17ddfcf59e["_transform_stream_with_config()"]
  4a62481c_02cb_a5de_1833_50669d5351a6["Runnable"]
  3b190320_b896_da9c_12ee_4d17ddfcf59e -->|defined in| 4a62481c_02cb_a5de_1833_50669d5351a6
  f619bab3_d5a5_2218_b91d_c55d57073715["transform()"]
  f619bab3_d5a5_2218_b91d_c55d57073715 -->|calls| 3b190320_b896_da9c_12ee_4d17ddfcf59e
  b19bb5c5_7d06_0da1_c670_6107a6c35950["transform()"]
  b19bb5c5_7d06_0da1_c670_6107a6c35950 -->|calls| 3b190320_b896_da9c_12ee_4d17ddfcf59e
  c0e0035e_3d41_1681_e722_9c0a4ca8826b["transform()"]
  c0e0035e_3d41_1681_e722_9c0a4ca8826b -->|calls| 3b190320_b896_da9c_12ee_4d17ddfcf59e
  b058d203_2f29_f0e0_63cd_7458a119791a["transform()"]
  b058d203_2f29_f0e0_63cd_7458a119791a -->|calls| 3b190320_b896_da9c_12ee_4d17ddfcf59e
  255c479b_b9fa_44d8_4de5_2562051e06b5["get_name()"]
  3b190320_b896_da9c_12ee_4d17ddfcf59e -->|calls| 255c479b_b9fa_44d8_4de5_2562051e06b5
  f35c0aee_685e_5a92_1708_787fad1851f1["astream_log()"]
  3b190320_b896_da9c_12ee_4d17ddfcf59e -->|calls| f35c0aee_685e_5a92_1708_787fad1851f1
  style 3b190320_b896_da9c_12ee_4d17ddfcf59e fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/base.py lines 2261–2357

    def _transform_stream_with_config(
        self,
        inputs: Iterator[Input],
        transformer: Callable[[Iterator[Input]], Iterator[Output]]
        | Callable[[Iterator[Input], CallbackManagerForChainRun], Iterator[Output]]
        | Callable[
            [Iterator[Input], CallbackManagerForChainRun, RunnableConfig],
            Iterator[Output],
        ],
        config: RunnableConfig | None,
        run_type: str | None = None,
        **kwargs: Any | None,
    ) -> Iterator[Output]:
        """Transform a stream with config.

        Helper method to transform an `Iterator` of `Input` values into an
        `Iterator` of `Output` values, with callbacks.

        Use this to implement `stream` or `transform` in `Runnable` subclasses.

        """
        # Extract defers_inputs from kwargs if present
        defers_inputs = kwargs.pop("defers_inputs", False)

        # tee the input so we can iterate over it twice
        input_for_tracing, input_for_transform = tee(inputs, 2)
        # Start the input iterator to ensure the input Runnable starts before this one
        final_input: Input | None = next(input_for_tracing, None)
        final_input_supported = True
        final_output: Output | None = None
        final_output_supported = True

        config = ensure_config(config)
        callback_manager = get_callback_manager_for_config(config)
        run_manager = callback_manager.on_chain_start(
            None,
            {"input": ""},
            run_type=run_type,
            name=config.get("run_name") or self.get_name(),
            run_id=config.pop("run_id", None),
            defers_inputs=defers_inputs,
        )
        try:
            child_config = patch_config(config, callbacks=run_manager.get_child())
            if accepts_config(transformer):
                kwargs["config"] = child_config
            if accepts_run_manager(transformer):
                kwargs["run_manager"] = run_manager
            with set_config_context(child_config) as context:
                iterator = context.run(transformer, input_for_transform, **kwargs)  # type: ignore[arg-type]
                if stream_handler := next(
                    (
                        cast("_StreamingCallbackHandler", h)
                        for h in run_manager.handlers
                        # instance check OK here, it's a mixin
                        if isinstance(h, _StreamingCallbackHandler)
                    ),
                    None,
                ):
                    # populates streamed_output in astream_log() output if needed
                    iterator = stream_handler.tap_output_iter(
                        run_manager.run_id, iterator
                    )
                try:
                    while True:
                        chunk: Output = context.run(next, iterator)
                        yield chunk
                        if final_output_supported:
                            if final_output is None:
                                final_output = chunk
                            else:
                                try:
                                    final_output = final_output + chunk  # type: ignore[operator]
                                except TypeError:
                                    final_output = chunk
                                    final_output_supported = False
                        else:
                            final_output = chunk
                except (StopIteration, GeneratorExit):
                    pass
                for ichunk in input_for_tracing:

Domain

Subdomains

Frequently Asked Questions

What does _transform_stream_with_config() do?
_transform_stream_with_config() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _transform_stream_with_config() defined?
_transform_stream_with_config() is defined in libs/core/langchain_core/runnables/base.py at line 2261.
What does _transform_stream_with_config() call?
_transform_stream_with_config() calls 2 function(s): astream_log, get_name.
What calls _transform_stream_with_config()?
_transform_stream_with_config() is called by 4 function(s): transform, transform, transform, transform.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free