Home / Function/ _batch_with_config() — langchain Function Reference

_batch_with_config() — langchain Function Reference

Architecture documentation for the _batch_with_config() function in base.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  86db0cf0_7454_e022_fb09_0102bfec9702["_batch_with_config()"]
  4a62481c_02cb_a5de_1833_50669d5351a6["Runnable"]
  86db0cf0_7454_e022_fb09_0102bfec9702 -->|defined in| 4a62481c_02cb_a5de_1833_50669d5351a6
  255c479b_b9fa_44d8_4de5_2562051e06b5["get_name()"]
  86db0cf0_7454_e022_fb09_0102bfec9702 -->|calls| 255c479b_b9fa_44d8_4de5_2562051e06b5
  style 86db0cf0_7454_e022_fb09_0102bfec9702 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/base.py lines 2119–2184

    def _batch_with_config(
        self,
        func: Callable[[list[Input]], list[Exception | Output]]
        | Callable[
            [list[Input], list[CallbackManagerForChainRun]], list[Exception | Output]
        ]
        | Callable[
            [list[Input], list[CallbackManagerForChainRun], list[RunnableConfig]],
            list[Exception | Output],
        ],
        inputs: list[Input],
        config: RunnableConfig | list[RunnableConfig] | None = None,
        *,
        return_exceptions: bool = False,
        run_type: str | None = None,
        **kwargs: Any | None,
    ) -> list[Output]:
        """Transform a list of inputs to a list of outputs, with callbacks.

        Helper method to transform an `Input` value to an `Output` value,
        with callbacks. Use this method to implement `invoke` in subclasses.

        """
        if not inputs:
            return []

        configs = get_config_list(config, len(inputs))
        callback_managers = [get_callback_manager_for_config(c) for c in configs]
        run_managers = [
            callback_manager.on_chain_start(
                None,
                input_,
                run_type=run_type,
                name=config.get("run_name") or self.get_name(),
                run_id=config.pop("run_id", None),
            )
            for callback_manager, input_, config in zip(
                callback_managers, inputs, configs, strict=False
            )
        ]
        try:
            if accepts_config(func):
                kwargs["config"] = [
                    patch_config(c, callbacks=rm.get_child())
                    for c, rm in zip(configs, run_managers, strict=False)
                ]
            if accepts_run_manager(func):
                kwargs["run_manager"] = run_managers
            output = func(inputs, **kwargs)  # type: ignore[call-arg]
        except BaseException as e:
            for run_manager in run_managers:
                run_manager.on_chain_error(e)
            if return_exceptions:
                return cast("list[Output]", [e for _ in inputs])
            raise
        else:
            first_exception: Exception | None = None
            for run_manager, out in zip(run_managers, output, strict=False):
                if isinstance(out, Exception):
                    first_exception = first_exception or out
                    run_manager.on_chain_error(out)
                else:
                    run_manager.on_chain_end(out)
            if return_exceptions or first_exception is None:
                return cast("list[Output]", output)
            raise first_exception

Domain

Subdomains

Calls

Frequently Asked Questions

What does _batch_with_config() do?
_batch_with_config() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _batch_with_config() defined?
_batch_with_config() is defined in libs/core/langchain_core/runnables/base.py at line 2119.
What does _batch_with_config() call?
_batch_with_config() calls 1 function(s): get_name.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free