_atransform() — langchain Function Reference
Architecture documentation for the _atransform() function in base.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 178252a4_c169_63ca_3a82_a5f7a1dcd26b["_atransform()"] c6a370e4_a02a_c56a_38eb_7ca734dcbfea["RunnableLambda"] 178252a4_c169_63ca_3a82_a5f7a1dcd26b -->|defined in| c6a370e4_a02a_c56a_38eb_7ca734dcbfea 6a8e40f9_b621_2878_43d7_e38d2dca02c5["_atransform()"] 178252a4_c169_63ca_3a82_a5f7a1dcd26b -->|calls| 6a8e40f9_b621_2878_43d7_e38d2dca02c5 style 178252a4_c169_63ca_3a82_a5f7a1dcd26b fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/base.py lines 5144–5241
async def _atransform(
self,
chunks: AsyncIterator[Input],
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> AsyncIterator[Output]:
final: Input
got_first_val = False
async for ichunk in chunks:
# By definitions, RunnableLambdas consume all input before emitting output.
# If the input is not addable, then we'll assume that we can
# only operate on the last chunk.
# So we'll iterate until we get to the last chunk!
if not got_first_val:
final = ichunk
got_first_val = True
else:
try:
final = final + ichunk # type: ignore[operator]
except TypeError:
final = ichunk
if hasattr(self, "afunc"):
afunc = self.afunc
else:
if inspect.isgeneratorfunction(self.func):
msg = (
"Cannot stream from a generator function asynchronously."
"Use .stream() instead."
)
raise TypeError(msg)
def func(
input_: Input,
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Output:
return call_func_with_variable_args(
self.func, input_, config, run_manager.get_sync(), **kwargs
)
@wraps(func)
async def f(*args: Any, **kwargs: Any) -> Any:
return await run_in_executor(config, func, *args, **kwargs)
afunc = f
if is_async_generator(afunc):
output: Output | None = None
async for chunk in cast(
"AsyncIterator[Output]",
acall_func_with_variable_args(
cast("Callable", afunc),
final,
config,
run_manager,
**kwargs,
),
):
yield chunk
if output is None:
output = chunk
else:
try:
output = output + chunk # type: ignore[operator]
except TypeError:
output = chunk
else:
output = await acall_func_with_variable_args(
cast("Callable", afunc),
final,
config,
run_manager,
**kwargs,
)
# If the output is a Runnable, use its astream output
if isinstance(output, Runnable):
recursion_limit = config["recursion_limit"]
Domain
Subdomains
Defined In
Calls
Source
Frequently Asked Questions
What does _atransform() do?
_atransform() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _atransform() defined?
_atransform() is defined in libs/core/langchain_core/runnables/base.py at line 5144.
What does _atransform() call?
_atransform() calls 1 function(s): _atransform.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free