Home / Class/ ContextThreadPoolExecutor Class — langchain Architecture

ContextThreadPoolExecutor Class — langchain Architecture

Architecture documentation for the ContextThreadPoolExecutor class in config.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  37528db0_a4b2_03f0_c774_86313b73a162["ContextThreadPoolExecutor"]
  4d16987d_fe07_22bb_f46d_7daeb24e0367["config.py"]
  37528db0_a4b2_03f0_c774_86313b73a162 -->|defined in| 4d16987d_fe07_22bb_f46d_7daeb24e0367
  83f410f0_325e_9dd8_b839_2af6f4a7b676["submit()"]
  37528db0_a4b2_03f0_c774_86313b73a162 -->|method| 83f410f0_325e_9dd8_b839_2af6f4a7b676
  10f85dc9_a2b1_ad35_7d02_3749f6491bef["map()"]
  37528db0_a4b2_03f0_c774_86313b73a162 -->|method| 10f85dc9_a2b1_ad35_7d02_3749f6491bef

Relationship Graph

Source Code

libs/core/langchain_core/runnables/config.py lines 527–576

class ContextThreadPoolExecutor(ThreadPoolExecutor):
    """ThreadPoolExecutor that copies the context to the child thread."""

    def submit(  # type: ignore[override]
        self,
        func: Callable[P, T],
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> Future[T]:
        """Submit a function to the executor.

        Args:
            func: The function to submit.
            *args: The positional arguments to the function.
            **kwargs: The keyword arguments to the function.

        Returns:
            The future for the function.
        """
        return super().submit(
            cast("Callable[..., T]", partial(copy_context().run, func, *args, **kwargs))
        )

    def map(
        self,
        fn: Callable[..., T],
        *iterables: Iterable[Any],
        **kwargs: Any,
    ) -> Iterator[T]:
        """Map a function to multiple iterables.

        Args:
            fn: The function to map.
            *iterables: The iterables to map over.
            timeout: The timeout for the map.
            chunksize: The chunksize for the map.

        Returns:
            The iterator for the mapped function.
        """
        contexts = [copy_context() for _ in range(len(iterables[0]))]  # type: ignore[arg-type]

        def _wrapped_fn(*args: Any) -> T:
            return contexts.pop().run(fn, *args)

        return super().map(
            _wrapped_fn,
            *iterables,
            **kwargs,
        )

Frequently Asked Questions

What is the ContextThreadPoolExecutor class?
ContextThreadPoolExecutor is a class in the langchain codebase, defined in libs/core/langchain_core/runnables/config.py.
Where is ContextThreadPoolExecutor defined?
ContextThreadPoolExecutor is defined in libs/core/langchain_core/runnables/config.py at line 527.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free