Home / Function/ ainvoke() — langchain Function Reference

ainvoke() — langchain Function Reference

Architecture documentation for the ainvoke() function in fallbacks.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  9e9c2c0e_693e_4470_f48c_76c9c0c22cd0["ainvoke()"]
  da856a55_079e_fd91_47d6_928251e6cee3["RunnableWithFallbacks"]
  9e9c2c0e_693e_4470_f48c_76c9c0c22cd0 -->|defined in| da856a55_079e_fd91_47d6_928251e6cee3
  style 9e9c2c0e_693e_4470_f48c_76c9c0c22cd0 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/fallbacks.py lines 216–263

    async def ainvoke(
        self,
        input: Input,
        config: RunnableConfig | None = None,
        **kwargs: Any | None,
    ) -> Output:
        if self.exception_key is not None and not isinstance(input, dict):
            msg = (
                "If 'exception_key' is specified then input must be a dictionary."
                f"However found a type of {type(input)} for input"
            )
            raise ValueError(msg)
        # setup callbacks
        config = ensure_config(config)
        callback_manager = get_async_callback_manager_for_config(config)
        # start the root run
        run_manager = await callback_manager.on_chain_start(
            None,
            input,
            name=config.get("run_name") or self.get_name(),
            run_id=config.pop("run_id", None),
        )

        first_error = None
        last_error = None
        for runnable in self.runnables:
            try:
                if self.exception_key and last_error is not None:
                    input[self.exception_key] = last_error  # type: ignore[index]
                child_config = patch_config(config, callbacks=run_manager.get_child())
                with set_config_context(child_config) as context:
                    coro = context.run(runnable.ainvoke, input, config, **kwargs)
                    output = await coro_with_context(coro, context)
            except self.exceptions_to_handle as e:
                if first_error is None:
                    first_error = e
                last_error = e
            except BaseException as e:
                await run_manager.on_chain_error(e)
                raise
            else:
                await run_manager.on_chain_end(output)
                return output
        if first_error is None:
            msg = "No error stored at end of fallbacks."
            raise ValueError(msg)
        await run_manager.on_chain_error(first_error)
        raise first_error

Domain

Subdomains

Frequently Asked Questions

What does ainvoke() do?
ainvoke() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/fallbacks.py.
Where is ainvoke() defined?
ainvoke() is defined in libs/core/langchain_core/runnables/fallbacks.py at line 216.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free