Home / Function/ _atransform() — langchain Function Reference

_atransform() — langchain Function Reference

Architecture documentation for the _atransform() function in base.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  178252a4_c169_63ca_3a82_a5f7a1dcd26b["_atransform()"]
  c6a370e4_a02a_c56a_38eb_7ca734dcbfea["RunnableLambda"]
  178252a4_c169_63ca_3a82_a5f7a1dcd26b -->|defined in| c6a370e4_a02a_c56a_38eb_7ca734dcbfea
  6a8e40f9_b621_2878_43d7_e38d2dca02c5["_atransform()"]
  178252a4_c169_63ca_3a82_a5f7a1dcd26b -->|calls| 6a8e40f9_b621_2878_43d7_e38d2dca02c5
  style 178252a4_c169_63ca_3a82_a5f7a1dcd26b fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/base.py lines 5144–5241

    async def _atransform(
        self,
        chunks: AsyncIterator[Input],
        run_manager: AsyncCallbackManagerForChainRun,
        config: RunnableConfig,
        **kwargs: Any,
    ) -> AsyncIterator[Output]:
        final: Input
        got_first_val = False
        async for ichunk in chunks:
            # By definitions, RunnableLambdas consume all input before emitting output.
            # If the input is not addable, then we'll assume that we can
            # only operate on the last chunk.
            # So we'll iterate until we get to the last chunk!
            if not got_first_val:
                final = ichunk
                got_first_val = True
            else:
                try:
                    final = final + ichunk  # type: ignore[operator]
                except TypeError:
                    final = ichunk

        if hasattr(self, "afunc"):
            afunc = self.afunc
        else:
            if inspect.isgeneratorfunction(self.func):
                msg = (
                    "Cannot stream from a generator function asynchronously."
                    "Use .stream() instead."
                )
                raise TypeError(msg)

            def func(
                input_: Input,
                run_manager: AsyncCallbackManagerForChainRun,
                config: RunnableConfig,
                **kwargs: Any,
            ) -> Output:
                return call_func_with_variable_args(
                    self.func, input_, config, run_manager.get_sync(), **kwargs
                )

            @wraps(func)
            async def f(*args: Any, **kwargs: Any) -> Any:
                return await run_in_executor(config, func, *args, **kwargs)

            afunc = f

        if is_async_generator(afunc):
            output: Output | None = None
            async for chunk in cast(
                "AsyncIterator[Output]",
                acall_func_with_variable_args(
                    cast("Callable", afunc),
                    final,
                    config,
                    run_manager,
                    **kwargs,
                ),
            ):
                yield chunk
                if output is None:
                    output = chunk
                else:
                    try:
                        output = output + chunk  # type: ignore[operator]
                    except TypeError:
                        output = chunk
        else:
            output = await acall_func_with_variable_args(
                cast("Callable", afunc),
                final,
                config,
                run_manager,
                **kwargs,
            )

        # If the output is a Runnable, use its astream output
        if isinstance(output, Runnable):
            recursion_limit = config["recursion_limit"]

Domain

Subdomains

Frequently Asked Questions

What does _atransform() do?
_atransform() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _atransform() defined?
_atransform() is defined in libs/core/langchain_core/runnables/base.py at line 5144.
What does _atransform() call?
_atransform() calls 1 function(s): _atransform.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free