_batch() — langchain Function Reference
Architecture documentation for the _batch() function in retry.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 52a44127_1c98_6129_693b_200d35f8fd7b["_batch()"] dcb89960_9531_c0ae_7764_c192a29f52c0["RunnableRetry"] 52a44127_1c98_6129_693b_200d35f8fd7b -->|defined in| dcb89960_9531_c0ae_7764_c192a29f52c0 19d453a1_fd58_2e89_f266_f99b0f5a741e["_sync_retrying()"] 52a44127_1c98_6129_693b_200d35f8fd7b -->|calls| 19d453a1_fd58_2e89_f266_f99b0f5a741e 790f2192_2e50_38d8_2e94_a32c9487f5b6["_patch_config_list()"] 52a44127_1c98_6129_693b_200d35f8fd7b -->|calls| 790f2192_2e50_38d8_2e94_a32c9487f5b6 7a99fa12_967f_04c9_9db3_b8ff6c02e8a1["batch()"] 52a44127_1c98_6129_693b_200d35f8fd7b -->|calls| 7a99fa12_967f_04c9_9db3_b8ff6c02e8a1 style 52a44127_1c98_6129_693b_200d35f8fd7b fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/retry.py lines 227–288
def _batch(
self,
inputs: list[Input],
run_manager: list["CallbackManagerForChainRun"],
config: list[RunnableConfig],
**kwargs: Any,
) -> list[Output | Exception]:
results_map: dict[int, Output] = {}
not_set: list[Output] = []
result = not_set
try:
for attempt in self._sync_retrying():
with attempt:
# Retry for inputs that have not yet succeeded
# Determine which original indices remain.
remaining_indices = [
i for i in range(len(inputs)) if i not in results_map
]
if not remaining_indices:
break
pending_inputs = [inputs[i] for i in remaining_indices]
pending_configs = [config[i] for i in remaining_indices]
pending_run_managers = [run_manager[i] for i in remaining_indices]
# Invoke underlying batch only on remaining elements.
result = super().batch(
pending_inputs,
self._patch_config_list(
pending_configs, pending_run_managers, attempt.retry_state
),
return_exceptions=True,
**kwargs,
)
# Register the results of the inputs that have succeeded, mapping
# back to their original indices.
first_exception = None
for offset, r in enumerate(result):
if isinstance(r, Exception):
if not first_exception:
first_exception = r
continue
orig_idx = remaining_indices[offset]
results_map[orig_idx] = r
# If any exception occurred, raise it, to retry the failed ones
if first_exception:
raise first_exception
if (
attempt.retry_state.outcome
and not attempt.retry_state.outcome.failed
):
attempt.retry_state.set_result(result)
except RetryError as e:
if result is not_set:
result = cast("list[Output]", [e] * len(inputs))
outputs: list[Output | Exception] = []
for idx in range(len(inputs)):
if idx in results_map:
outputs.append(results_map[idx])
else:
outputs.append(result.pop(0))
return outputs
Domain
Subdomains
Defined In
Source
Frequently Asked Questions
What does _batch() do?
_batch() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/retry.py.
Where is _batch() defined?
_batch() is defined in libs/core/langchain_core/runnables/retry.py at line 227.
What does _batch() call?
_batch() calls 3 function(s): _patch_config_list, _sync_retrying, batch.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free