Home / Function/ _transform() — langchain Function Reference

_transform() — langchain Function Reference

Architecture documentation for the _transform() function in base.py from the langchain codebase.

Function python LangChainCore Runnables calls 1 called by 1

Entity Profile

Dependency Diagram

graph TD
  f54efa31_1266_16ec_54f7_0adda8cb6619["_transform()"]
  17599172_8889_afc6_2237_4429f3439071["RunnableParallel"]
  f54efa31_1266_16ec_54f7_0adda8cb6619 -->|defined in| 17599172_8889_afc6_2237_4429f3439071
  87d3d0f0_cf29_e59f_63c9_a28e3dff7138["_transform()"]
  87d3d0f0_cf29_e59f_63c9_a28e3dff7138 -->|calls| f54efa31_1266_16ec_54f7_0adda8cb6619
  87d3d0f0_cf29_e59f_63c9_a28e3dff7138["_transform()"]
  f54efa31_1266_16ec_54f7_0adda8cb6619 -->|calls| 87d3d0f0_cf29_e59f_63c9_a28e3dff7138
  style f54efa31_1266_16ec_54f7_0adda8cb6619 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/base.py lines 3948–3993

    def _transform(
        self,
        inputs: Iterator[Input],
        run_manager: CallbackManagerForChainRun,
        config: RunnableConfig,
    ) -> Iterator[AddableDict]:
        # Shallow copy steps to ignore mutations while in progress
        steps = dict(self.steps__)
        # Each step gets a copy of the input iterator,
        # which is consumed in parallel in a separate thread.
        input_copies = list(safetee(inputs, len(steps), lock=threading.Lock()))
        with get_executor_for_config(config) as executor:
            # Create the transform() generator for each step
            named_generators = [
                (
                    name,
                    step.transform(
                        input_copies.pop(),
                        patch_config(
                            config, callbacks=run_manager.get_child(f"map:key:{name}")
                        ),
                    ),
                )
                for name, step in steps.items()
            ]
            # Start the first iteration of each generator
            futures = {
                executor.submit(next, generator): (step_name, generator)
                for step_name, generator in named_generators
            }
            # Yield chunks from each as they become available,
            # and start the next iteration of that generator that yielded it.
            # When all generators are exhausted, stop.
            while futures:
                completed_futures, _ = wait(futures, return_when=FIRST_COMPLETED)
                for future in completed_futures:
                    (step_name, generator) = futures.pop(future)
                    try:
                        chunk = AddableDict({step_name: future.result()})
                        yield chunk
                        futures[executor.submit(next, generator)] = (
                            step_name,
                            generator,
                        )
                    except StopIteration:
                        pass

Domain

Subdomains

Calls

Called By

Frequently Asked Questions

What does _transform() do?
_transform() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _transform() defined?
_transform() is defined in libs/core/langchain_core/runnables/base.py at line 3948.
What does _transform() call?
_transform() calls 1 function(s): _transform.
What calls _transform()?
_transform() is called by 1 function(s): _transform.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free