Home / Function/ _atransform_stream_with_config() — langchain Function Reference

_atransform_stream_with_config() — langchain Function Reference

Architecture documentation for the _atransform_stream_with_config() function in base.py from the langchain codebase.

Function python LangChainCore Runnables calls 2 called by 4

Entity Profile

Dependency Diagram

graph TD
  7d532bc6_d85b_8af5_f142_583cf8e684f6["_atransform_stream_with_config()"]
  4a62481c_02cb_a5de_1833_50669d5351a6["Runnable"]
  7d532bc6_d85b_8af5_f142_583cf8e684f6 -->|defined in| 4a62481c_02cb_a5de_1833_50669d5351a6
  6b0af8a6_40de_913f_d715_e591ebb530c1["atransform()"]
  6b0af8a6_40de_913f_d715_e591ebb530c1 -->|calls| 7d532bc6_d85b_8af5_f142_583cf8e684f6
  dd47e0f6_01d1_bac7_e0fb_611561853fa8["atransform()"]
  dd47e0f6_01d1_bac7_e0fb_611561853fa8 -->|calls| 7d532bc6_d85b_8af5_f142_583cf8e684f6
  9c31698a_2e43_9df9_b618_11fd9b0d1d60["atransform()"]
  9c31698a_2e43_9df9_b618_11fd9b0d1d60 -->|calls| 7d532bc6_d85b_8af5_f142_583cf8e684f6
  dac5e541_1204_9fcb_a2a6_e9b9b5845c0c["atransform()"]
  dac5e541_1204_9fcb_a2a6_e9b9b5845c0c -->|calls| 7d532bc6_d85b_8af5_f142_583cf8e684f6
  255c479b_b9fa_44d8_4de5_2562051e06b5["get_name()"]
  7d532bc6_d85b_8af5_f142_583cf8e684f6 -->|calls| 255c479b_b9fa_44d8_4de5_2562051e06b5
  f35c0aee_685e_5a92_1708_787fad1851f1["astream_log()"]
  7d532bc6_d85b_8af5_f142_583cf8e684f6 -->|calls| f35c0aee_685e_5a92_1708_787fad1851f1
  style 7d532bc6_d85b_8af5_f142_583cf8e684f6 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/base.py lines 2359–2464

    async def _atransform_stream_with_config(
        self,
        inputs: AsyncIterator[Input],
        transformer: Callable[[AsyncIterator[Input]], AsyncIterator[Output]]
        | Callable[
            [AsyncIterator[Input], AsyncCallbackManagerForChainRun],
            AsyncIterator[Output],
        ]
        | Callable[
            [AsyncIterator[Input], AsyncCallbackManagerForChainRun, RunnableConfig],
            AsyncIterator[Output],
        ],
        config: RunnableConfig | None,
        run_type: str | None = None,
        **kwargs: Any | None,
    ) -> AsyncIterator[Output]:
        """Transform a stream with config.

        Helper method to transform an Async `Iterator` of `Input` values into an
        Async `Iterator` of `Output` values, with callbacks.

        Use this to implement `astream` or `atransform` in `Runnable` subclasses.

        """
        # Extract defers_inputs from kwargs if present
        defers_inputs = kwargs.pop("defers_inputs", False)

        # tee the input so we can iterate over it twice
        input_for_tracing, input_for_transform = atee(inputs, 2)
        # Start the input iterator to ensure the input Runnable starts before this one
        final_input: Input | None = await anext(input_for_tracing, None)
        final_input_supported = True
        final_output: Output | None = None
        final_output_supported = True

        config = ensure_config(config)
        callback_manager = get_async_callback_manager_for_config(config)
        run_manager = await callback_manager.on_chain_start(
            None,
            {"input": ""},
            run_type=run_type,
            name=config.get("run_name") or self.get_name(),
            run_id=config.pop("run_id", None),
            defers_inputs=defers_inputs,
        )
        try:
            child_config = patch_config(config, callbacks=run_manager.get_child())
            if accepts_config(transformer):
                kwargs["config"] = child_config
            if accepts_run_manager(transformer):
                kwargs["run_manager"] = run_manager
            with set_config_context(child_config) as context:
                iterator_ = context.run(transformer, input_for_transform, **kwargs)  # type: ignore[arg-type]

                if stream_handler := next(
                    (
                        cast("_StreamingCallbackHandler", h)
                        for h in run_manager.handlers
                        # instance check OK here, it's a mixin
                        if isinstance(h, _StreamingCallbackHandler)
                    ),
                    None,
                ):
                    # populates streamed_output in astream_log() output if needed
                    iterator = stream_handler.tap_output_aiter(
                        run_manager.run_id, iterator_
                    )
                else:
                    iterator = iterator_
                try:
                    while True:
                        chunk = await coro_with_context(anext(iterator), context)
                        yield chunk
                        if final_output_supported:
                            if final_output is None:
                                final_output = chunk
                            else:
                                try:
                                    final_output = final_output + chunk
                                except TypeError:
                                    final_output = chunk

Domain

Subdomains

Frequently Asked Questions

What does _atransform_stream_with_config() do?
_atransform_stream_with_config() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _atransform_stream_with_config() defined?
_atransform_stream_with_config() is defined in libs/core/langchain_core/runnables/base.py at line 2359.
What does _atransform_stream_with_config() call?
_atransform_stream_with_config() calls 2 function(s): astream_log, get_name.
What calls _atransform_stream_with_config()?
_atransform_stream_with_config() is called by 4 function(s): atransform, atransform, atransform, atransform.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free