StreamingRunnable Class — langchain Architecture
Architecture documentation for the StreamingRunnable class in test_runnable_events_v2.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD ec849ced_49cd_d2f5_60e0_db0d12879deb["StreamingRunnable"] 33c02978_2077_5819_7048_bc2a81e80625["test_runnable_events_v2.py"] ec849ced_49cd_d2f5_60e0_db0d12879deb -->|defined in| 33c02978_2077_5819_7048_bc2a81e80625 40332084_b761_5fbb_75ef_6d1b79951608["__init__()"] ec849ced_49cd_d2f5_60e0_db0d12879deb -->|method| 40332084_b761_5fbb_75ef_6d1b79951608 44779688_f3fe_6e36_2d50_baa2a78e6a2d["invoke()"] ec849ced_49cd_d2f5_60e0_db0d12879deb -->|method| 44779688_f3fe_6e36_2d50_baa2a78e6a2d 96cda70a_ff2d_eca4_0fa9_2ffa9cde3432["stream()"] ec849ced_49cd_d2f5_60e0_db0d12879deb -->|method| 96cda70a_ff2d_eca4_0fa9_2ffa9cde3432 858b85ba_6d9e_d209_9eb5_2469bad0adca["astream()"] ec849ced_49cd_d2f5_60e0_db0d12879deb -->|method| 858b85ba_6d9e_d209_9eb5_2469bad0adca
Relationship Graph
Source Code
libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py lines 2121–2182
class StreamingRunnable(Runnable[Any, Addable]):
"""A custom runnable used for testing purposes."""
iterable: Iterable[Addable]
def __init__(self, iterable: Iterable[Addable]) -> None:
"""Initialize the runnable."""
self.iterable = iterable
@override
def invoke(
self, input: Any, config: RunnableConfig | None = None, **kwargs: Any
) -> Addable:
"""Invoke the runnable."""
msg = "Server side error"
raise ValueError(msg)
@override
def stream(
self,
input: Any,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Iterator[Addable]:
raise NotImplementedError
@override
async def astream(
self,
input: Any,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Addable]:
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
None,
input,
name=config.get("run_name", self.get_name()),
run_id=config.get("run_id"),
)
try:
final_output = None
for element in self.iterable:
if isinstance(element, BaseException):
raise element # noqa: TRY301
yield element
if final_output is None:
final_output = element
else:
try:
final_output = final_output + element
except TypeError:
final_output = element
# set final channel values as run output
await run_manager.on_chain_end(final_output)
except BaseException as e:
await run_manager.on_chain_error(e)
raise
Source
Frequently Asked Questions
What is the StreamingRunnable class?
StreamingRunnable is a class in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py.
Where is StreamingRunnable defined?
StreamingRunnable is defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py at line 2121.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free