159 lines
4.6 KiB
Python
159 lines
4.6 KiB
Python
"""
|
|
psycopg server-side cursor (async).
|
|
"""
|
|
|
|
# Copyright (C) 2020 The Psycopg Team
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Any, overload
|
|
from collections.abc import Iterable
|
|
|
|
from . import errors as e
|
|
from .abc import Params, Query
|
|
from .rows import AsyncRowFactory, Row
|
|
from ._compat import Self
|
|
from .cursor_async import AsyncCursor
|
|
from ._server_cursor_base import ServerCursorMixin
|
|
|
|
if TYPE_CHECKING:
|
|
from .connection_async import AsyncConnection
|
|
|
|
|
|
class AsyncServerCursor(
|
|
ServerCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]
|
|
):
|
|
__module__ = "psycopg"
|
|
__slots__ = ()
|
|
|
|
@overload
|
|
def __init__(
|
|
self,
|
|
connection: AsyncConnection[Row],
|
|
name: str,
|
|
*,
|
|
scrollable: bool | None = None,
|
|
withhold: bool = False,
|
|
): ...
|
|
|
|
@overload
|
|
def __init__(
|
|
self,
|
|
connection: AsyncConnection[Any],
|
|
name: str,
|
|
*,
|
|
row_factory: AsyncRowFactory[Row],
|
|
scrollable: bool | None = None,
|
|
withhold: bool = False,
|
|
): ...
|
|
|
|
def __init__(
|
|
self,
|
|
connection: AsyncConnection[Any],
|
|
name: str,
|
|
*,
|
|
row_factory: AsyncRowFactory[Row] | None = None,
|
|
scrollable: bool | None = None,
|
|
withhold: bool = False,
|
|
):
|
|
AsyncCursor.__init__(
|
|
self, connection, row_factory=row_factory or connection.row_factory
|
|
)
|
|
ServerCursorMixin.__init__(self, name, scrollable, withhold)
|
|
|
|
async def close(self) -> None:
|
|
"""
|
|
Close the current cursor and free associated resources.
|
|
"""
|
|
async with self._conn.lock:
|
|
if self.closed:
|
|
return
|
|
if not self._conn.closed:
|
|
await self._conn.wait(self._close_gen())
|
|
await super().close()
|
|
|
|
async def execute(
|
|
self,
|
|
query: Query,
|
|
params: Params | None = None,
|
|
*,
|
|
binary: bool | None = None,
|
|
**kwargs: Any,
|
|
) -> Self:
|
|
"""
|
|
Open a cursor to execute a query to the database.
|
|
"""
|
|
if kwargs:
|
|
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
|
|
if self._pgconn.pipeline_status:
|
|
raise e.NotSupportedError(
|
|
"server-side cursors not supported in pipeline mode"
|
|
)
|
|
|
|
try:
|
|
async with self._conn.lock:
|
|
await self._conn.wait(self._declare_gen(query, params, binary))
|
|
except e._NO_TRACEBACK as ex:
|
|
raise ex.with_traceback(None)
|
|
|
|
return self
|
|
|
|
async def executemany(
|
|
self, query: Query, params_seq: Iterable[Params], *, returning: bool = True
|
|
) -> None:
|
|
"""Method not implemented for server-side cursors."""
|
|
raise e.NotSupportedError("executemany not supported on server-side cursors")
|
|
|
|
async def fetchone(self) -> Row | None:
|
|
async with self._conn.lock:
|
|
recs = await self._conn.wait(self._fetch_gen(1))
|
|
if recs:
|
|
self._pos += 1
|
|
return recs[0]
|
|
else:
|
|
return None
|
|
|
|
async def fetchmany(self, size: int = 0) -> list[Row]:
|
|
if not size:
|
|
size = self.arraysize
|
|
async with self._conn.lock:
|
|
recs = await self._conn.wait(self._fetch_gen(size))
|
|
self._pos += len(recs)
|
|
return recs
|
|
|
|
async def fetchall(self) -> list[Row]:
|
|
async with self._conn.lock:
|
|
recs = await self._conn.wait(self._fetch_gen(None))
|
|
self._pos += len(recs)
|
|
return recs
|
|
|
|
def __aiter__(self) -> Self:
|
|
return self
|
|
|
|
async def __anext__(self) -> Row:
|
|
# Fetch a new page if we never fetched any, or we are at the end of
|
|
# a page of size itersize, meaning there is likely a following one.
|
|
if self._iter_rows is None or (
|
|
self._page_pos >= len(self._iter_rows) >= self.itersize
|
|
):
|
|
async with self._conn.lock:
|
|
self._iter_rows = await self._conn.wait(self._fetch_gen(self.itersize))
|
|
self._page_pos = 0
|
|
|
|
if self._page_pos >= len(self._iter_rows):
|
|
raise StopAsyncIteration("no more records to return")
|
|
|
|
rec = self._iter_rows[self._page_pos]
|
|
self._page_pos += 1
|
|
self._pos += 1
|
|
return rec
|
|
|
|
async def scroll(self, value: int, mode: str = "relative") -> None:
|
|
async with self._conn.lock:
|
|
await self._conn.wait(self._scroll_gen(value, mode))
|
|
# Postgres doesn't have a reliable way to report a cursor out of bound
|
|
if mode == "relative":
|
|
self._pos += value
|
|
else:
|
|
self._pos = value
|