abatch() — langchain Function Reference
Architecture documentation for the abatch() function in fallbacks.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 770bf1c0_9914_ee65_37c9_761b33c21fae["abatch()"] da856a55_079e_fd91_47d6_928251e6cee3["RunnableWithFallbacks"] 770bf1c0_9914_ee65_37c9_761b33c21fae -->|defined in| da856a55_079e_fd91_47d6_928251e6cee3 style 770bf1c0_9914_ee65_37c9_761b33c21fae fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/fallbacks.py lines 362–463
async def abatch(
self,
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]:
if self.exception_key is not None and not all(
isinstance(input_, dict) for input_ in inputs
):
msg = (
"If 'exception_key' is specified then inputs must be dictionaries."
f"However found a type of {type(inputs[0])} for input"
)
raise ValueError(msg)
if not inputs:
return []
# setup callbacks
configs = get_config_list(config, len(inputs))
callback_managers = [
AsyncCallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
local_callbacks=None,
verbose=False,
inheritable_tags=config.get("tags"),
local_tags=None,
inheritable_metadata=config.get("metadata"),
local_metadata=None,
)
for config in configs
]
# start the root runs, one per input
run_managers: list[AsyncCallbackManagerForChainRun] = await asyncio.gather(
*(
cm.on_chain_start(
None,
input_,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
for cm, input_, config in zip(
callback_managers, inputs, configs, strict=False
)
)
)
to_return: dict[int, Output | BaseException] = {}
run_again = dict(enumerate(inputs))
handled_exceptions: dict[int, BaseException] = {}
first_to_raise = None
for runnable in self.runnables:
outputs = await runnable.abatch(
[input_ for _, input_ in sorted(run_again.items())],
[
# each step a child run of the corresponding root run
patch_config(configs[i], callbacks=run_managers[i].get_child())
for i in sorted(run_again)
],
return_exceptions=True,
**kwargs,
)
for (i, input_), output in zip(
sorted(run_again.copy().items()), outputs, strict=False
):
if isinstance(output, BaseException) and not isinstance(
output, self.exceptions_to_handle
):
if not return_exceptions:
first_to_raise = first_to_raise or output
else:
handled_exceptions[i] = output
run_again.pop(i)
elif isinstance(output, self.exceptions_to_handle):
if self.exception_key:
input_[self.exception_key] = output # type: ignore[index]
handled_exceptions[i] = output
else:
Domain
Subdomains
Source
Frequently Asked Questions
What does abatch() do?
abatch() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/fallbacks.py.
Where is abatch() defined?
abatch() is defined in libs/core/langchain_core/runnables/fallbacks.py at line 362.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free