# WARNING: this file is auto-generated by 'async_to_sync.py' # from the original file '_server_cursor_async.py' # DO NOT CHANGE! Change the original file instead. """ psycopg server-side cursor (sync). """ # Copyright (C) 2020 The Psycopg Team from __future__ import annotations from typing import TYPE_CHECKING, Any, overload from collections.abc import Iterable from . import errors as e from .abc import Params, Query from .rows import Row, RowFactory from .cursor import Cursor from ._compat import Self from ._server_cursor_base import ServerCursorMixin if TYPE_CHECKING: from .connection import Connection class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]): __module__ = "psycopg" __slots__ = () @overload def __init__( self, connection: Connection[Row], name: str, *, scrollable: bool | None = None, withhold: bool = False, ): ... @overload def __init__( self, connection: Connection[Any], name: str, *, row_factory: RowFactory[Row], scrollable: bool | None = None, withhold: bool = False, ): ... def __init__( self, connection: Connection[Any], name: str, *, row_factory: RowFactory[Row] | None = None, scrollable: bool | None = None, withhold: bool = False, ): Cursor.__init__( self, connection, row_factory=row_factory or connection.row_factory ) ServerCursorMixin.__init__(self, name, scrollable, withhold) def close(self) -> None: """ Close the current cursor and free associated resources. """ with self._conn.lock: if self.closed: return if not self._conn.closed: self._conn.wait(self._close_gen()) super().close() def execute( self, query: Query, params: Params | None = None, *, binary: bool | None = None, **kwargs: Any, ) -> Self: """ Open a cursor to execute a query to the database. """ if kwargs: raise TypeError(f"keyword not supported: {list(kwargs)[0]}") if self._pgconn.pipeline_status: raise e.NotSupportedError( "server-side cursors not supported in pipeline mode" ) try: with self._conn.lock: self._conn.wait(self._declare_gen(query, params, binary)) except e._NO_TRACEBACK as ex: raise ex.with_traceback(None) return self def executemany( self, query: Query, params_seq: Iterable[Params], *, returning: bool = True ) -> None: """Method not implemented for server-side cursors.""" raise e.NotSupportedError("executemany not supported on server-side cursors") def fetchone(self) -> Row | None: with self._conn.lock: recs = self._conn.wait(self._fetch_gen(1)) if recs: self._pos += 1 return recs[0] else: return None def fetchmany(self, size: int = 0) -> list[Row]: if not size: size = self.arraysize with self._conn.lock: recs = self._conn.wait(self._fetch_gen(size)) self._pos += len(recs) return recs def fetchall(self) -> list[Row]: with self._conn.lock: recs = self._conn.wait(self._fetch_gen(None)) self._pos += len(recs) return recs def __iter__(self) -> Self: return self def __next__(self) -> Row: # Fetch a new page if we never fetched any, or we are at the end of # a page of size itersize, meaning there is likely a following one. if ( self._iter_rows is None or self._page_pos >= len(self._iter_rows) >= self.itersize ): with self._conn.lock: self._iter_rows = self._conn.wait(self._fetch_gen(self.itersize)) self._page_pos = 0 if self._page_pos >= len(self._iter_rows): raise StopIteration("no more records to return") rec = self._iter_rows[self._page_pos] self._page_pos += 1 self._pos += 1 return rec def scroll(self, value: int, mode: str = "relative") -> None: with self._conn.lock: self._conn.wait(self._scroll_gen(value, mode)) # Postgres doesn't have a reliable way to report a cursor out of bound if mode == "relative": self._pos += value else: self._pos = value