_atransform() — langchain Function Reference
Architecture documentation for the _atransform() function in base.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD df93336b_2161_a3ab_4adf_2f7000c37a15["_atransform()"] 17599172_8889_afc6_2237_4429f3439071["RunnableParallel"] df93336b_2161_a3ab_4adf_2f7000c37a15 -->|defined in| 17599172_8889_afc6_2237_4429f3439071 6a8e40f9_b621_2878_43d7_e38d2dca02c5["_atransform()"] 6a8e40f9_b621_2878_43d7_e38d2dca02c5 -->|calls| df93336b_2161_a3ab_4adf_2f7000c37a15 6a8e40f9_b621_2878_43d7_e38d2dca02c5["_atransform()"] df93336b_2161_a3ab_4adf_2f7000c37a15 -->|calls| 6a8e40f9_b621_2878_43d7_e38d2dca02c5 style df93336b_2161_a3ab_4adf_2f7000c37a15 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/base.py lines 4015–4064
async def _atransform(
self,
inputs: AsyncIterator[Input],
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
) -> AsyncIterator[AddableDict]:
# Shallow copy steps to ignore mutations while in progress
steps = dict(self.steps__)
# Each step gets a copy of the input iterator,
# which is consumed in parallel in a separate thread.
input_copies = list(atee(inputs, len(steps), lock=asyncio.Lock()))
# Create the transform() generator for each step
named_generators = [
(
name,
step.atransform(
input_copies.pop(),
patch_config(
config, callbacks=run_manager.get_child(f"map:key:{name}")
),
),
)
for name, step in steps.items()
]
# Wrap in a coroutine to satisfy linter
async def get_next_chunk(generator: AsyncIterator) -> Output | None:
return await anext(generator)
# Start the first iteration of each generator
tasks = {
asyncio.create_task(get_next_chunk(generator)): (step_name, generator)
for step_name, generator in named_generators
}
# Yield chunks from each as they become available,
# and start the next iteration of the generator that yielded it.
# When all generators are exhausted, stop.
while tasks:
completed_tasks, _ = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in completed_tasks:
(step_name, generator) = tasks.pop(task)
try:
chunk = AddableDict({step_name: task.result()})
yield chunk
new_task = asyncio.create_task(get_next_chunk(generator))
tasks[new_task] = (step_name, generator)
except StopAsyncIteration:
pass
Domain
Subdomains
Defined In
Calls
Called By
Source
Frequently Asked Questions
What does _atransform() do?
_atransform() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _atransform() defined?
_atransform() is defined in libs/core/langchain_core/runnables/base.py at line 4015.
What does _atransform() call?
_atransform() calls 1 function(s): _atransform.
What calls _atransform()?
_atransform() is called by 1 function(s): _atransform.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free