Files
2026-06-23 15:20:56 +02:00

161 lines
4.6 KiB
Python

# WARNING: this file is auto-generated by 'async_to_sync.py'
# from the original file '_server_cursor_async.py'
# DO NOT CHANGE! Change the original file instead.
"""
psycopg server-side cursor (sync).
"""
# Copyright (C) 2020 The Psycopg Team
from __future__ import annotations
from typing import TYPE_CHECKING, Any, overload
from collections.abc import Iterable
from . import errors as e
from .abc import Params, Query
from .rows import Row, RowFactory
from .cursor import Cursor
from ._compat import Self
from ._server_cursor_base import ServerCursorMixin
if TYPE_CHECKING:
from .connection import Connection
class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
__module__ = "psycopg"
__slots__ = ()
@overload
def __init__(
self,
connection: Connection[Row],
name: str,
*,
scrollable: bool | None = None,
withhold: bool = False,
): ...
@overload
def __init__(
self,
connection: Connection[Any],
name: str,
*,
row_factory: RowFactory[Row],
scrollable: bool | None = None,
withhold: bool = False,
): ...
def __init__(
self,
connection: Connection[Any],
name: str,
*,
row_factory: RowFactory[Row] | None = None,
scrollable: bool | None = None,
withhold: bool = False,
):
Cursor.__init__(
self, connection, row_factory=row_factory or connection.row_factory
)
ServerCursorMixin.__init__(self, name, scrollable, withhold)
def close(self) -> None:
"""
Close the current cursor and free associated resources.
"""
with self._conn.lock:
if self.closed:
return
if not self._conn.closed:
self._conn.wait(self._close_gen())
super().close()
def execute(
self,
query: Query,
params: Params | None = None,
*,
binary: bool | None = None,
**kwargs: Any,
) -> Self:
"""
Open a cursor to execute a query to the database.
"""
if kwargs:
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
if self._pgconn.pipeline_status:
raise e.NotSupportedError(
"server-side cursors not supported in pipeline mode"
)
try:
with self._conn.lock:
self._conn.wait(self._declare_gen(query, params, binary))
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
return self
def executemany(
self, query: Query, params_seq: Iterable[Params], *, returning: bool = True
) -> None:
"""Method not implemented for server-side cursors."""
raise e.NotSupportedError("executemany not supported on server-side cursors")
def fetchone(self) -> Row | None:
with self._conn.lock:
recs = self._conn.wait(self._fetch_gen(1))
if recs:
self._pos += 1
return recs[0]
else:
return None
def fetchmany(self, size: int = 0) -> list[Row]:
if not size:
size = self.arraysize
with self._conn.lock:
recs = self._conn.wait(self._fetch_gen(size))
self._pos += len(recs)
return recs
def fetchall(self) -> list[Row]:
with self._conn.lock:
recs = self._conn.wait(self._fetch_gen(None))
self._pos += len(recs)
return recs
def __iter__(self) -> Self:
return self
def __next__(self) -> Row:
# Fetch a new page if we never fetched any, or we are at the end of
# a page of size itersize, meaning there is likely a following one.
if (
self._iter_rows is None
or self._page_pos >= len(self._iter_rows) >= self.itersize
):
with self._conn.lock:
self._iter_rows = self._conn.wait(self._fetch_gen(self.itersize))
self._page_pos = 0
if self._page_pos >= len(self._iter_rows):
raise StopIteration("no more records to return")
rec = self._iter_rows[self._page_pos]
self._page_pos += 1
self._pos += 1
return rec
def scroll(self, value: int, mode: str = "relative") -> None:
with self._conn.lock:
self._conn.wait(self._scroll_gen(value, mode))
# Postgres doesn't have a reliable way to report a cursor out of bound
if mode == "relative":
self._pos += value
else:
self._pos = value