161 lines
4.6 KiB
Python
161 lines
4.6 KiB
Python
# WARNING: this file is auto-generated by 'async_to_sync.py'
|
|
# from the original file '_server_cursor_async.py'
|
|
# DO NOT CHANGE! Change the original file instead.
|
|
"""
|
|
psycopg server-side cursor (sync).
|
|
"""
|
|
|
|
# Copyright (C) 2020 The Psycopg Team
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Any, overload
|
|
from collections.abc import Iterable
|
|
|
|
from . import errors as e
|
|
from .abc import Params, Query
|
|
from .rows import Row, RowFactory
|
|
from .cursor import Cursor
|
|
from ._compat import Self
|
|
from ._server_cursor_base import ServerCursorMixin
|
|
|
|
if TYPE_CHECKING:
|
|
from .connection import Connection
|
|
|
|
|
|
class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
|
|
__module__ = "psycopg"
|
|
__slots__ = ()
|
|
|
|
@overload
|
|
def __init__(
|
|
self,
|
|
connection: Connection[Row],
|
|
name: str,
|
|
*,
|
|
scrollable: bool | None = None,
|
|
withhold: bool = False,
|
|
): ...
|
|
|
|
@overload
|
|
def __init__(
|
|
self,
|
|
connection: Connection[Any],
|
|
name: str,
|
|
*,
|
|
row_factory: RowFactory[Row],
|
|
scrollable: bool | None = None,
|
|
withhold: bool = False,
|
|
): ...
|
|
|
|
def __init__(
|
|
self,
|
|
connection: Connection[Any],
|
|
name: str,
|
|
*,
|
|
row_factory: RowFactory[Row] | None = None,
|
|
scrollable: bool | None = None,
|
|
withhold: bool = False,
|
|
):
|
|
Cursor.__init__(
|
|
self, connection, row_factory=row_factory or connection.row_factory
|
|
)
|
|
ServerCursorMixin.__init__(self, name, scrollable, withhold)
|
|
|
|
def close(self) -> None:
|
|
"""
|
|
Close the current cursor and free associated resources.
|
|
"""
|
|
with self._conn.lock:
|
|
if self.closed:
|
|
return
|
|
if not self._conn.closed:
|
|
self._conn.wait(self._close_gen())
|
|
super().close()
|
|
|
|
def execute(
|
|
self,
|
|
query: Query,
|
|
params: Params | None = None,
|
|
*,
|
|
binary: bool | None = None,
|
|
**kwargs: Any,
|
|
) -> Self:
|
|
"""
|
|
Open a cursor to execute a query to the database.
|
|
"""
|
|
if kwargs:
|
|
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
|
|
if self._pgconn.pipeline_status:
|
|
raise e.NotSupportedError(
|
|
"server-side cursors not supported in pipeline mode"
|
|
)
|
|
|
|
try:
|
|
with self._conn.lock:
|
|
self._conn.wait(self._declare_gen(query, params, binary))
|
|
except e._NO_TRACEBACK as ex:
|
|
raise ex.with_traceback(None)
|
|
|
|
return self
|
|
|
|
def executemany(
|
|
self, query: Query, params_seq: Iterable[Params], *, returning: bool = True
|
|
) -> None:
|
|
"""Method not implemented for server-side cursors."""
|
|
raise e.NotSupportedError("executemany not supported on server-side cursors")
|
|
|
|
def fetchone(self) -> Row | None:
|
|
with self._conn.lock:
|
|
recs = self._conn.wait(self._fetch_gen(1))
|
|
if recs:
|
|
self._pos += 1
|
|
return recs[0]
|
|
else:
|
|
return None
|
|
|
|
def fetchmany(self, size: int = 0) -> list[Row]:
|
|
if not size:
|
|
size = self.arraysize
|
|
with self._conn.lock:
|
|
recs = self._conn.wait(self._fetch_gen(size))
|
|
self._pos += len(recs)
|
|
return recs
|
|
|
|
def fetchall(self) -> list[Row]:
|
|
with self._conn.lock:
|
|
recs = self._conn.wait(self._fetch_gen(None))
|
|
self._pos += len(recs)
|
|
return recs
|
|
|
|
def __iter__(self) -> Self:
|
|
return self
|
|
|
|
def __next__(self) -> Row:
|
|
# Fetch a new page if we never fetched any, or we are at the end of
|
|
# a page of size itersize, meaning there is likely a following one.
|
|
if (
|
|
self._iter_rows is None
|
|
or self._page_pos >= len(self._iter_rows) >= self.itersize
|
|
):
|
|
with self._conn.lock:
|
|
self._iter_rows = self._conn.wait(self._fetch_gen(self.itersize))
|
|
self._page_pos = 0
|
|
|
|
if self._page_pos >= len(self._iter_rows):
|
|
raise StopIteration("no more records to return")
|
|
|
|
rec = self._iter_rows[self._page_pos]
|
|
self._page_pos += 1
|
|
self._pos += 1
|
|
return rec
|
|
|
|
def scroll(self, value: int, mode: str = "relative") -> None:
|
|
with self._conn.lock:
|
|
self._conn.wait(self._scroll_gen(value, mode))
|
|
# Postgres doesn't have a reliable way to report a cursor out of bound
|
|
if mode == "relative":
|
|
self._pos += value
|
|
else:
|
|
self._pos = value
|