_abatch() — langchain Function Reference
Architecture documentation for the _abatch() function in retry.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD d376a50b_f292_726b_5406_bcd22f556419["_abatch()"] dcb89960_9531_c0ae_7764_c192a29f52c0["RunnableRetry"] d376a50b_f292_726b_5406_bcd22f556419 -->|defined in| dcb89960_9531_c0ae_7764_c192a29f52c0 a05cbbdd_13ef_9d8b_5022_8f9b0a46c86a["_async_retrying()"] d376a50b_f292_726b_5406_bcd22f556419 -->|calls| a05cbbdd_13ef_9d8b_5022_8f9b0a46c86a 790f2192_2e50_38d8_2e94_a32c9487f5b6["_patch_config_list()"] d376a50b_f292_726b_5406_bcd22f556419 -->|calls| 790f2192_2e50_38d8_2e94_a32c9487f5b6 7dea3d8c_e973_7709_8b13_2e21005cfcf4["abatch()"] d376a50b_f292_726b_5406_bcd22f556419 -->|calls| 7dea3d8c_e973_7709_8b13_2e21005cfcf4 style d376a50b_f292_726b_5406_bcd22f556419 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/retry.py lines 303–363
async def _abatch(
self,
inputs: list[Input],
run_manager: list["AsyncCallbackManagerForChainRun"],
config: list[RunnableConfig],
**kwargs: Any,
) -> list[Output | Exception]:
results_map: dict[int, Output] = {}
not_set: list[Output] = []
result = not_set
try:
async for attempt in self._async_retrying():
with attempt:
# Retry for inputs that have not yet succeeded
# Determine which original indices remain.
remaining_indices = [
i for i in range(len(inputs)) if i not in results_map
]
if not remaining_indices:
break
pending_inputs = [inputs[i] for i in remaining_indices]
pending_configs = [config[i] for i in remaining_indices]
pending_run_managers = [run_manager[i] for i in remaining_indices]
result = await super().abatch(
pending_inputs,
self._patch_config_list(
pending_configs, pending_run_managers, attempt.retry_state
),
return_exceptions=True,
**kwargs,
)
# Register the results of the inputs that have succeeded, mapping
# back to their original indices.
first_exception = None
for offset, r in enumerate(result):
if isinstance(r, Exception):
if not first_exception:
first_exception = r
continue
orig_idx = remaining_indices[offset]
results_map[orig_idx] = r
# If any exception occurred, raise it, to retry the failed ones
if first_exception:
raise first_exception
if (
attempt.retry_state.outcome
and not attempt.retry_state.outcome.failed
):
attempt.retry_state.set_result(result)
except RetryError as e:
if result is not_set:
result = cast("list[Output]", [e] * len(inputs))
outputs: list[Output | Exception] = []
for idx in range(len(inputs)):
if idx in results_map:
outputs.append(results_map[idx])
else:
outputs.append(result.pop(0))
return outputs
Domain
Subdomains
Defined In
Source
Frequently Asked Questions
What does _abatch() do?
_abatch() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/retry.py.
Where is _abatch() defined?
_abatch() is defined in libs/core/langchain_core/runnables/retry.py at line 303.
What does _abatch() call?
_abatch() calls 3 function(s): _async_retrying, _patch_config_list, abatch.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free