Home / Function/ abatch() — langchain Function Reference

abatch() — langchain Function Reference

Architecture documentation for the abatch() function in router.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  a842484a_4f36_54a4_496a_003b4abe3e51["abatch()"]
  65b9aaca_53f2_847d_9ab7_024fe53cbbd2["RouterRunnable"]
  a842484a_4f36_54a4_496a_003b4abe3e51 -->|defined in| 65b9aaca_53f2_847d_9ab7_024fe53cbbd2
  17bc6bca_2365_183e_a519_547d32deffed["ainvoke()"]
  a842484a_4f36_54a4_496a_003b4abe3e51 -->|calls| 17bc6bca_2365_183e_a519_547d32deffed
  style a842484a_4f36_54a4_496a_003b4abe3e51 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/router.py lines 173–206

    async def abatch(
        self,
        inputs: list[RouterInput],
        config: RunnableConfig | list[RunnableConfig] | None = None,
        *,
        return_exceptions: bool = False,
        **kwargs: Any | None,
    ) -> list[Output]:
        if not inputs:
            return []

        keys = [input_["key"] for input_ in inputs]
        actual_inputs = [input_["input"] for input_ in inputs]
        if any(key not in self.runnables for key in keys):
            msg = "One or more keys do not have a corresponding runnable"
            raise ValueError(msg)

        async def ainvoke(
            runnable: Runnable[Input, Output], input_: Input, config: RunnableConfig
        ) -> Output | Exception:
            if return_exceptions:
                try:
                    return await runnable.ainvoke(input_, config, **kwargs)
                except Exception as e:
                    return e
            else:
                return await runnable.ainvoke(input_, config, **kwargs)

        runnables = [self.runnables[key] for key in keys]
        configs = get_config_list(config, len(inputs))
        return await gather_with_concurrency(
            configs[0].get("max_concurrency"),
            *map(ainvoke, runnables, actual_inputs, configs),
        )

Domain

Subdomains

Calls

Frequently Asked Questions

What does abatch() do?
abatch() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/router.py.
Where is abatch() defined?
abatch() is defined in libs/core/langchain_core/runnables/router.py at line 173.
What does abatch() call?
abatch() calls 1 function(s): ainvoke.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free