Home / Function/ aupdate() — langchain Function Reference

aupdate() — langchain Function Reference

Architecture documentation for the aupdate() function in _sql_record_manager.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  739947b3_1dd1_a014_7b81_04c8441e1e6c["aupdate()"]
  9d7938c1_1c25_4cae_be80_9fc00a5ed077["SQLRecordManager"]
  739947b3_1dd1_a014_7b81_04c8441e1e6c -->|defined in| 9d7938c1_1c25_4cae_be80_9fc00a5ed077
  3e4c4a44_922b_f6c5_2589_1953bed0156b["aget_time()"]
  739947b3_1dd1_a014_7b81_04c8441e1e6c -->|calls| 3e4c4a44_922b_f6c5_2589_1953bed0156b
  d75b3a18_aeb2_cb98_8ca3_1eab11972c74["_amake_session()"]
  739947b3_1dd1_a014_7b81_04c8441e1e6c -->|calls| d75b3a18_aeb2_cb98_8ca3_1eab11972c74
  style 739947b3_1dd1_a014_7b81_04c8441e1e6c fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/langchain/langchain_classic/indexes/_sql_record_manager.py lines 335–415

    async def aupdate(
        self,
        keys: Sequence[str],
        *,
        group_ids: Sequence[str | None] | None = None,
        time_at_least: float | None = None,
    ) -> None:
        """Upsert records into the SQLite database."""
        if group_ids is None:
            group_ids = [None] * len(keys)

        if len(keys) != len(group_ids):
            msg = (
                f"Number of keys ({len(keys)}) does not match number of "
                f"group_ids ({len(group_ids)})"
            )
            raise ValueError(msg)

        # Get the current time from the server.
        # This makes an extra round trip to the server, should not be a big deal
        # if the batch size is large enough.
        # Getting the time here helps us compare it against the time_at_least
        # and raise an error if there is a time sync issue.
        # Here, we're just being extra careful to minimize the chance of
        # data loss due to incorrectly deleting records.
        update_time = await self.aget_time()

        if time_at_least and update_time < time_at_least:
            # Safeguard against time sync issues
            msg = f"Time sync issue: {update_time} < {time_at_least}"
            raise AssertionError(msg)

        records_to_upsert = [
            {
                "key": key,
                "namespace": self.namespace,
                "updated_at": update_time,
                "group_id": group_id,
            }
            for key, group_id in zip(keys, group_ids, strict=False)
        ]

        async with self._amake_session() as session:
            if self.dialect == "sqlite":
                from sqlalchemy.dialects.sqlite import Insert as SqliteInsertType
                from sqlalchemy.dialects.sqlite import insert as sqlite_insert

                # Note: uses SQLite insert to make on_conflict_do_update work.
                # This code needs to be generalized a bit to work with more dialects.
                sqlite_insert_stmt: SqliteInsertType = sqlite_insert(
                    UpsertionRecord,
                ).values(records_to_upsert)
                stmt = sqlite_insert_stmt.on_conflict_do_update(
                    [UpsertionRecord.key, UpsertionRecord.namespace],
                    set_={
                        "updated_at": sqlite_insert_stmt.excluded.updated_at,
                        "group_id": sqlite_insert_stmt.excluded.group_id,
                    },
                )
            elif self.dialect == "postgresql":
                from sqlalchemy.dialects.postgresql import Insert as PgInsertType
                from sqlalchemy.dialects.postgresql import insert as pg_insert

                # Note: uses SQLite insert to make on_conflict_do_update work.
                # This code needs to be generalized a bit to work with more dialects.
                pg_insert_stmt: PgInsertType = pg_insert(UpsertionRecord).values(
                    records_to_upsert,
                )
                stmt = pg_insert_stmt.on_conflict_do_update(  # type: ignore[assignment]
                    constraint="uix_key_namespace",  # Name of constraint
                    set_={
                        "updated_at": pg_insert_stmt.excluded.updated_at,
                        "group_id": pg_insert_stmt.excluded.group_id,
                    },
                )
            else:
                msg = f"Unsupported dialect {self.dialect}"
                raise NotImplementedError(msg)

            await session.execute(stmt)
            await session.commit()

Domain

Subdomains

Frequently Asked Questions

What does aupdate() do?
aupdate() is a function in the langchain codebase, defined in libs/langchain/langchain_classic/indexes/_sql_record_manager.py.
Where is aupdate() defined?
aupdate() is defined in libs/langchain/langchain_classic/indexes/_sql_record_manager.py at line 335.
What does aupdate() call?
aupdate() calls 2 function(s): _amake_session, aget_time.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free