abatch() — langchain Function Reference
Architecture documentation for the abatch() function in base.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD b9fb513b_bf75_3677_fc04_41bba780611c["abatch()"] c4bf8d59_69f9_f8e5_efba_837c20df2360["RunnableSequence"] b9fb513b_bf75_3677_fc04_41bba780611c -->|defined in| c4bf8d59_69f9_f8e5_efba_837c20df2360 3e736ea3_30ef_68c3_5f32_b8a49b37d501["abatch()"] b9fb513b_bf75_3677_fc04_41bba780611c -->|calls| 3e736ea3_30ef_68c3_5f32_b8a49b37d501 e9999cfb_b051_7cee_27e4_2e2746488c79["batch()"] b9fb513b_bf75_3677_fc04_41bba780611c -->|calls| e9999cfb_b051_7cee_27e4_2e2746488c79 255c479b_b9fa_44d8_4de5_2562051e06b5["get_name()"] b9fb513b_bf75_3677_fc04_41bba780611c -->|calls| 255c479b_b9fa_44d8_4de5_2562051e06b5 style b9fb513b_bf75_3677_fc04_41bba780611c fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/base.py lines 3335–3463
async def abatch(
self,
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]:
if not inputs:
return []
# setup callbacks and context
configs = get_config_list(config, len(inputs))
callback_managers = [
AsyncCallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
local_callbacks=None,
verbose=False,
inheritable_tags=config.get("tags"),
local_tags=None,
inheritable_metadata=config.get("metadata"),
local_metadata=None,
)
for config in configs
]
# start the root runs, one per input
run_managers: list[AsyncCallbackManagerForChainRun] = await asyncio.gather(
*(
cm.on_chain_start(
None,
input_,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
for cm, input_, config in zip(
callback_managers, inputs, configs, strict=False
)
)
)
# invoke .batch() on each step
# this uses batching optimizations in Runnable subclasses, like LLM
try:
if return_exceptions:
# Track which inputs (by index) failed so far
# If an input has failed it will be present in this map,
# and the value will be the exception that was raised.
failed_inputs_map: dict[int, Exception] = {}
for stepidx, step in enumerate(self.steps):
# Assemble the original indexes of the remaining inputs
# (i.e. the ones that haven't failed yet)
remaining_idxs = [
i for i in range(len(configs)) if i not in failed_inputs_map
]
# Invoke the step on the remaining inputs
inputs = await step.abatch(
[
inp
for i, inp in zip(remaining_idxs, inputs, strict=False)
if i not in failed_inputs_map
],
[
# each step a child run of the corresponding root run
patch_config(
config,
callbacks=rm.get_child(f"seq:step:{stepidx + 1}"),
)
for i, (rm, config) in enumerate(
zip(run_managers, configs, strict=False)
)
if i not in failed_inputs_map
],
return_exceptions=return_exceptions,
**(kwargs if stepidx == 0 else {}),
)
# If an input failed, add it to the map
failed_inputs_map.update(
{
i: inp
for i, inp in zip(remaining_idxs, inputs, strict=False)
if isinstance(inp, Exception)
Domain
Subdomains
Defined In
Source
Frequently Asked Questions
What does abatch() do?
abatch() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is abatch() defined?
abatch() is defined in libs/core/langchain_core/runnables/base.py at line 3335.
What does abatch() call?
abatch() calls 3 function(s): abatch, batch, get_name.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free