Home / Function/ abatch() — langchain Function Reference

abatch() — langchain Function Reference

Architecture documentation for the abatch() function in base.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  b9fb513b_bf75_3677_fc04_41bba780611c["abatch()"]
  c4bf8d59_69f9_f8e5_efba_837c20df2360["RunnableSequence"]
  b9fb513b_bf75_3677_fc04_41bba780611c -->|defined in| c4bf8d59_69f9_f8e5_efba_837c20df2360
  3e736ea3_30ef_68c3_5f32_b8a49b37d501["abatch()"]
  b9fb513b_bf75_3677_fc04_41bba780611c -->|calls| 3e736ea3_30ef_68c3_5f32_b8a49b37d501
  e9999cfb_b051_7cee_27e4_2e2746488c79["batch()"]
  b9fb513b_bf75_3677_fc04_41bba780611c -->|calls| e9999cfb_b051_7cee_27e4_2e2746488c79
  255c479b_b9fa_44d8_4de5_2562051e06b5["get_name()"]
  b9fb513b_bf75_3677_fc04_41bba780611c -->|calls| 255c479b_b9fa_44d8_4de5_2562051e06b5
  style b9fb513b_bf75_3677_fc04_41bba780611c fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/base.py lines 3335–3463

    async def abatch(
        self,
        inputs: list[Input],
        config: RunnableConfig | list[RunnableConfig] | None = None,
        *,
        return_exceptions: bool = False,
        **kwargs: Any | None,
    ) -> list[Output]:
        if not inputs:
            return []

        # setup callbacks and context
        configs = get_config_list(config, len(inputs))
        callback_managers = [
            AsyncCallbackManager.configure(
                inheritable_callbacks=config.get("callbacks"),
                local_callbacks=None,
                verbose=False,
                inheritable_tags=config.get("tags"),
                local_tags=None,
                inheritable_metadata=config.get("metadata"),
                local_metadata=None,
            )
            for config in configs
        ]
        # start the root runs, one per input
        run_managers: list[AsyncCallbackManagerForChainRun] = await asyncio.gather(
            *(
                cm.on_chain_start(
                    None,
                    input_,
                    name=config.get("run_name") or self.get_name(),
                    run_id=config.pop("run_id", None),
                )
                for cm, input_, config in zip(
                    callback_managers, inputs, configs, strict=False
                )
            )
        )

        # invoke .batch() on each step
        # this uses batching optimizations in Runnable subclasses, like LLM
        try:
            if return_exceptions:
                # Track which inputs (by index) failed so far
                # If an input has failed it will be present in this map,
                # and the value will be the exception that was raised.
                failed_inputs_map: dict[int, Exception] = {}
                for stepidx, step in enumerate(self.steps):
                    # Assemble the original indexes of the remaining inputs
                    # (i.e. the ones that haven't failed yet)
                    remaining_idxs = [
                        i for i in range(len(configs)) if i not in failed_inputs_map
                    ]
                    # Invoke the step on the remaining inputs
                    inputs = await step.abatch(
                        [
                            inp
                            for i, inp in zip(remaining_idxs, inputs, strict=False)
                            if i not in failed_inputs_map
                        ],
                        [
                            # each step a child run of the corresponding root run
                            patch_config(
                                config,
                                callbacks=rm.get_child(f"seq:step:{stepidx + 1}"),
                            )
                            for i, (rm, config) in enumerate(
                                zip(run_managers, configs, strict=False)
                            )
                            if i not in failed_inputs_map
                        ],
                        return_exceptions=return_exceptions,
                        **(kwargs if stepidx == 0 else {}),
                    )
                    # If an input failed, add it to the map
                    failed_inputs_map.update(
                        {
                            i: inp
                            for i, inp in zip(remaining_idxs, inputs, strict=False)
                            if isinstance(inp, Exception)

Domain

Subdomains

Frequently Asked Questions

What does abatch() do?
abatch() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is abatch() defined?
abatch() is defined in libs/core/langchain_core/runnables/base.py at line 3335.
What does abatch() call?
abatch() calls 3 function(s): abatch, batch, get_name.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free