Home / Function/ _atransform() — langchain Function Reference

_atransform() — langchain Function Reference

Architecture documentation for the _atransform() function in base.py from the langchain codebase.

Function python LangChainCore Runnables calls 1 called by 1

Entity Profile

Dependency Diagram

graph TD
  df93336b_2161_a3ab_4adf_2f7000c37a15["_atransform()"]
  17599172_8889_afc6_2237_4429f3439071["RunnableParallel"]
  df93336b_2161_a3ab_4adf_2f7000c37a15 -->|defined in| 17599172_8889_afc6_2237_4429f3439071
  6a8e40f9_b621_2878_43d7_e38d2dca02c5["_atransform()"]
  6a8e40f9_b621_2878_43d7_e38d2dca02c5 -->|calls| df93336b_2161_a3ab_4adf_2f7000c37a15
  6a8e40f9_b621_2878_43d7_e38d2dca02c5["_atransform()"]
  df93336b_2161_a3ab_4adf_2f7000c37a15 -->|calls| 6a8e40f9_b621_2878_43d7_e38d2dca02c5
  style df93336b_2161_a3ab_4adf_2f7000c37a15 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/base.py lines 4015–4064

    async def _atransform(
        self,
        inputs: AsyncIterator[Input],
        run_manager: AsyncCallbackManagerForChainRun,
        config: RunnableConfig,
    ) -> AsyncIterator[AddableDict]:
        # Shallow copy steps to ignore mutations while in progress
        steps = dict(self.steps__)
        # Each step gets a copy of the input iterator,
        # which is consumed in parallel in a separate thread.
        input_copies = list(atee(inputs, len(steps), lock=asyncio.Lock()))
        # Create the transform() generator for each step
        named_generators = [
            (
                name,
                step.atransform(
                    input_copies.pop(),
                    patch_config(
                        config, callbacks=run_manager.get_child(f"map:key:{name}")
                    ),
                ),
            )
            for name, step in steps.items()
        ]

        # Wrap in a coroutine to satisfy linter
        async def get_next_chunk(generator: AsyncIterator) -> Output | None:
            return await anext(generator)

        # Start the first iteration of each generator
        tasks = {
            asyncio.create_task(get_next_chunk(generator)): (step_name, generator)
            for step_name, generator in named_generators
        }
        # Yield chunks from each as they become available,
        # and start the next iteration of the generator that yielded it.
        # When all generators are exhausted, stop.
        while tasks:
            completed_tasks, _ = await asyncio.wait(
                tasks, return_when=asyncio.FIRST_COMPLETED
            )
            for task in completed_tasks:
                (step_name, generator) = tasks.pop(task)
                try:
                    chunk = AddableDict({step_name: task.result()})
                    yield chunk
                    new_task = asyncio.create_task(get_next_chunk(generator))
                    tasks[new_task] = (step_name, generator)
                except StopAsyncIteration:
                    pass

Domain

Subdomains

Called By

Frequently Asked Questions

What does _atransform() do?
_atransform() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _atransform() defined?
_atransform() is defined in libs/core/langchain_core/runnables/base.py at line 4015.
What does _atransform() call?
_atransform() calls 1 function(s): _atransform.
What calls _atransform()?
_atransform() is called by 1 function(s): _atransform.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free