Home / Function/ _abatch() — langchain Function Reference

_abatch() — langchain Function Reference

Architecture documentation for the _abatch() function in retry.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  d376a50b_f292_726b_5406_bcd22f556419["_abatch()"]
  dcb89960_9531_c0ae_7764_c192a29f52c0["RunnableRetry"]
  d376a50b_f292_726b_5406_bcd22f556419 -->|defined in| dcb89960_9531_c0ae_7764_c192a29f52c0
  a05cbbdd_13ef_9d8b_5022_8f9b0a46c86a["_async_retrying()"]
  d376a50b_f292_726b_5406_bcd22f556419 -->|calls| a05cbbdd_13ef_9d8b_5022_8f9b0a46c86a
  790f2192_2e50_38d8_2e94_a32c9487f5b6["_patch_config_list()"]
  d376a50b_f292_726b_5406_bcd22f556419 -->|calls| 790f2192_2e50_38d8_2e94_a32c9487f5b6
  7dea3d8c_e973_7709_8b13_2e21005cfcf4["abatch()"]
  d376a50b_f292_726b_5406_bcd22f556419 -->|calls| 7dea3d8c_e973_7709_8b13_2e21005cfcf4
  style d376a50b_f292_726b_5406_bcd22f556419 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/retry.py lines 303–363

    async def _abatch(
        self,
        inputs: list[Input],
        run_manager: list["AsyncCallbackManagerForChainRun"],
        config: list[RunnableConfig],
        **kwargs: Any,
    ) -> list[Output | Exception]:
        results_map: dict[int, Output] = {}

        not_set: list[Output] = []
        result = not_set
        try:
            async for attempt in self._async_retrying():
                with attempt:
                    # Retry for inputs that have not yet succeeded
                    # Determine which original indices remain.
                    remaining_indices = [
                        i for i in range(len(inputs)) if i not in results_map
                    ]
                    if not remaining_indices:
                        break
                    pending_inputs = [inputs[i] for i in remaining_indices]
                    pending_configs = [config[i] for i in remaining_indices]
                    pending_run_managers = [run_manager[i] for i in remaining_indices]
                    result = await super().abatch(
                        pending_inputs,
                        self._patch_config_list(
                            pending_configs, pending_run_managers, attempt.retry_state
                        ),
                        return_exceptions=True,
                        **kwargs,
                    )
                    # Register the results of the inputs that have succeeded, mapping
                    # back to their original indices.
                    first_exception = None
                    for offset, r in enumerate(result):
                        if isinstance(r, Exception):
                            if not first_exception:
                                first_exception = r
                            continue
                        orig_idx = remaining_indices[offset]
                        results_map[orig_idx] = r
                    # If any exception occurred, raise it, to retry the failed ones
                    if first_exception:
                        raise first_exception
                if (
                    attempt.retry_state.outcome
                    and not attempt.retry_state.outcome.failed
                ):
                    attempt.retry_state.set_result(result)
        except RetryError as e:
            if result is not_set:
                result = cast("list[Output]", [e] * len(inputs))

        outputs: list[Output | Exception] = []
        for idx in range(len(inputs)):
            if idx in results_map:
                outputs.append(results_map[idx])
            else:
                outputs.append(result.pop(0))
        return outputs

Domain

Subdomains

Frequently Asked Questions

What does _abatch() do?
_abatch() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/retry.py.
Where is _abatch() defined?
_abatch() is defined in libs/core/langchain_core/runnables/retry.py at line 303.
What does _abatch() call?
_abatch() calls 3 function(s): _async_retrying, _patch_config_list, abatch.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free