_atransform_stream_with_config() — langchain Function Reference
Architecture documentation for the _atransform_stream_with_config() function in base.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 7d532bc6_d85b_8af5_f142_583cf8e684f6["_atransform_stream_with_config()"] 4a62481c_02cb_a5de_1833_50669d5351a6["Runnable"] 7d532bc6_d85b_8af5_f142_583cf8e684f6 -->|defined in| 4a62481c_02cb_a5de_1833_50669d5351a6 6b0af8a6_40de_913f_d715_e591ebb530c1["atransform()"] 6b0af8a6_40de_913f_d715_e591ebb530c1 -->|calls| 7d532bc6_d85b_8af5_f142_583cf8e684f6 dd47e0f6_01d1_bac7_e0fb_611561853fa8["atransform()"] dd47e0f6_01d1_bac7_e0fb_611561853fa8 -->|calls| 7d532bc6_d85b_8af5_f142_583cf8e684f6 9c31698a_2e43_9df9_b618_11fd9b0d1d60["atransform()"] 9c31698a_2e43_9df9_b618_11fd9b0d1d60 -->|calls| 7d532bc6_d85b_8af5_f142_583cf8e684f6 dac5e541_1204_9fcb_a2a6_e9b9b5845c0c["atransform()"] dac5e541_1204_9fcb_a2a6_e9b9b5845c0c -->|calls| 7d532bc6_d85b_8af5_f142_583cf8e684f6 255c479b_b9fa_44d8_4de5_2562051e06b5["get_name()"] 7d532bc6_d85b_8af5_f142_583cf8e684f6 -->|calls| 255c479b_b9fa_44d8_4de5_2562051e06b5 f35c0aee_685e_5a92_1708_787fad1851f1["astream_log()"] 7d532bc6_d85b_8af5_f142_583cf8e684f6 -->|calls| f35c0aee_685e_5a92_1708_787fad1851f1 style 7d532bc6_d85b_8af5_f142_583cf8e684f6 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/base.py lines 2359–2464
async def _atransform_stream_with_config(
self,
inputs: AsyncIterator[Input],
transformer: Callable[[AsyncIterator[Input]], AsyncIterator[Output]]
| Callable[
[AsyncIterator[Input], AsyncCallbackManagerForChainRun],
AsyncIterator[Output],
]
| Callable[
[AsyncIterator[Input], AsyncCallbackManagerForChainRun, RunnableConfig],
AsyncIterator[Output],
],
config: RunnableConfig | None,
run_type: str | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]:
"""Transform a stream with config.
Helper method to transform an Async `Iterator` of `Input` values into an
Async `Iterator` of `Output` values, with callbacks.
Use this to implement `astream` or `atransform` in `Runnable` subclasses.
"""
# Extract defers_inputs from kwargs if present
defers_inputs = kwargs.pop("defers_inputs", False)
# tee the input so we can iterate over it twice
input_for_tracing, input_for_transform = atee(inputs, 2)
# Start the input iterator to ensure the input Runnable starts before this one
final_input: Input | None = await anext(input_for_tracing, None)
final_input_supported = True
final_output: Output | None = None
final_output_supported = True
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
None,
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
defers_inputs=defers_inputs,
)
try:
child_config = patch_config(config, callbacks=run_manager.get_child())
if accepts_config(transformer):
kwargs["config"] = child_config
if accepts_run_manager(transformer):
kwargs["run_manager"] = run_manager
with set_config_context(child_config) as context:
iterator_ = context.run(transformer, input_for_transform, **kwargs) # type: ignore[arg-type]
if stream_handler := next(
(
cast("_StreamingCallbackHandler", h)
for h in run_manager.handlers
# instance check OK here, it's a mixin
if isinstance(h, _StreamingCallbackHandler)
),
None,
):
# populates streamed_output in astream_log() output if needed
iterator = stream_handler.tap_output_aiter(
run_manager.run_id, iterator_
)
else:
iterator = iterator_
try:
while True:
chunk = await coro_with_context(anext(iterator), context)
yield chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk
except TypeError:
final_output = chunk
Domain
Subdomains
Defined In
Calls
Source
Frequently Asked Questions
What does _atransform_stream_with_config() do?
_atransform_stream_with_config() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/base.py.
Where is _atransform_stream_with_config() defined?
_atransform_stream_with_config() is defined in libs/core/langchain_core/runnables/base.py at line 2359.
What does _atransform_stream_with_config() call?
_atransform_stream_with_config() calls 2 function(s): astream_log, get_name.
What calls _atransform_stream_with_config()?
_atransform_stream_with_config() is called by 4 function(s): atransform, atransform, atransform, atransform.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free