Files
KPK/fidelio_db_service.py
2026-06-23 15:20:56 +02:00

403 lines
12 KiB
Python

from __future__ import annotations
import re
import time
import logging
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
import data
import postgres_service
class FidelioDbError(Exception):
pass
logger = logging.getLogger(__name__)
def _s(value) -> str:
return "" if value is None else str(value).strip()
def _i(value, default: int = 0) -> int:
try:
return int(float(str(value).strip().strip("\"'")))
except Exception:
return default
def _money(value) -> Decimal:
try:
return Decimal(str(value).replace(",", ".")).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
except Exception:
return Decimal("0.00")
def _quote_ident(name: str) -> str:
name = _s(name)
if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name):
raise FidelioDbError(f"Neplatny PostgreSQL identifikator: {name}")
return f'"{name}"'
def _table(conn: data.PostgresConnection, table_name: str) -> str:
schema = _s(getattr(conn, "schema_", "")) or "food600"
return f"{_quote_ident(schema)}.{_quote_ident(table_name)}"
def _now_parts():
now = datetime.now()
return now, now.strftime("%y%m%d"), now.strftime("%H%M%S")
def _db_out_defaults() -> dict:
now, dat, cas = _now_parts()
payload = {
"typ": "",
"stav": "I",
"cis_kasy": "",
"outlet": "",
"porad_tiz": "",
"pokoj": "",
"datum": now.date(),
"sekund": Decimal("0.000"),
"dat": dat,
"cas": cas,
"ucet_kasa": "",
"serv_per": "",
"folio": "",
"gn": "",
"p_hostu": "",
"total": Decimal("0.00"),
"dysko": Decimal("0.00"),
"pm": "",
"ckar": "",
"track2": "",
"retrans": Decimal("0"),
}
for idx in range(1, 10):
payload[f"s{idx}"] = Decimal("0.00")
payload[f"d{idx}"] = Decimal("0.00")
payload[f"t{idx}"] = Decimal("0.00")
return payload
def _insert_db_out(pg, conn: data.PostgresConnection, payload: dict) -> int:
table = _table(conn, "db_out")
columns = [
"typ", "stav", "cis_kasy", "outlet", "porad_tiz", "pokoj",
"datum", "sekund", "dat", "cas", "ucet_kasa", "serv_per",
"folio", "gn", "p_hostu", "total",
"s1", "d1", "t1", "s2", "d2", "t2", "s3", "d3", "t3",
"s4", "d4", "t4", "s5", "d5", "t5", "s6", "d6", "t6",
"s7", "d7", "t7", "s8", "d8", "t8", "s9", "d9", "t9",
"dysko", "pm", "ckar", "track2", "retrans",
]
placeholders = ", ".join(["%s"] * len(columns))
column_sql = ", ".join(_quote_ident(col) for col in columns)
values = [payload.get(col) for col in columns]
cur = pg.cursor()
try:
cur.execute(
f"INSERT INTO {table} ({column_sql}) VALUES ({placeholders}) RETURNING postseqnr",
values,
)
row = cur.fetchone()
pg.commit()
finally:
cur.close()
return int(row[0])
def _mark_request_done(pg, conn: data.PostgresConnection, postseqnr: int, db_in_id: int | None = None) -> None:
cur = pg.cursor()
try:
cur.execute(
f"UPDATE {_table(conn, 'db_out')} SET stav=%s WHERE postseqnr=%s",
("O", postseqnr),
)
if db_in_id is not None:
cur.execute(
f"UPDATE {_table(conn, 'db_in')} SET stav=%s WHERE idriadok=%s",
("O", db_in_id),
)
pg.commit()
finally:
cur.close()
def _wait_for_answer(pg, conn: data.PostgresConnection, postseqnr: int, timeout_seconds: int = 30) -> dict | None:
table = _table(conn, "db_in")
columns = [
"idriadok", "odpoved", "kecy", "pokoj1",
"folio1", "gn1", "pokoj2", "folio2", "gn2",
"pokoj3", "folio3", "gn3", "pokoj4", "folio4", "gn4",
]
column_sql = ", ".join(_quote_ident(col) for col in columns)
deadline = time.time() + max(1, int(timeout_seconds or 30))
while time.time() < deadline:
cur = pg.cursor()
try:
cur.execute(
f"SELECT {column_sql} FROM {table} WHERE postseqnr=%s ORDER BY idriadok LIMIT 1",
(postseqnr,),
)
row = cur.fetchone()
finally:
cur.close()
if row:
return dict(zip(columns, row))
time.sleep(1)
return None
def _is_ok_answer(answer: dict) -> bool:
value = _s(answer.get("odpoved")).upper()
return value in {"", "OK", "UR"}
def _guest_from_answer(answer: dict, idx: int) -> data.HotelGuest | None:
folio = _s(answer.get(f"folio{idx}"))
if not folio:
return None
room_code = _s(answer.get(f"pokoj{idx}")) or _s(answer.get("pokoj1"))
return data.HotelGuest(
id=folio,
account_id=folio,
guest_name=_s(answer.get(f"gn{idx}")),
room_id=room_code,
room_code=room_code,
result=0,
)
def load_guests(
conn: data.PostgresConnection,
id_kas: str,
params: dict,
room_code: str = "",
track2: str = "",
timeout_seconds: int = 30,
) -> list[data.HotelGuest]:
width = _i((params or {}).get("fidelio_izba_znakov"), 3)
room_code = _s(room_code)
track2 = _s(track2)
if len(room_code) >= width:
width = len(room_code)
track2 = ""
elif not room_code:
width = 0
payload = _db_out_defaults()
payload.update({
"typ": "DOTAZ",
"cis_kasy": _s(id_kas),
"outlet": _s(id_kas),
"pokoj": room_code.rjust(width, "0"),
"track2": track2,
})
with postgres_service.connect(conn) as pg:
postseqnr = _insert_db_out(pg, conn, payload)
answer = _wait_for_answer(pg, conn, postseqnr, timeout_seconds=timeout_seconds)
if not answer:
_mark_request_done(pg, conn, postseqnr)
raise FidelioDbError("Opera neodpoveda.")
_mark_request_done(pg, conn, postseqnr, _i(answer.get("idriadok")))
if not _is_ok_answer(answer):
raise FidelioDbError(_s(answer.get("kecy")) or "Opera vratila chybu.")
guests = []
for idx in range(1, 5):
guest = _guest_from_answer(answer, idx)
if guest:
guests.append(guest)
return guests
def _normalize_mag_card(card_code: str) -> tuple[str, str]:
code = _s(card_code)
if code.startswith("%") and "?" in code:
code = code[1:code.find("?")]
elif code.startswith("5") and "<" in code:
code = code[1:code.find("<")]
elif code.startswith("_") and "<" in code:
code = code[1:code.find("<")]
else:
raise FidelioDbError("Neznamy format hotelovej karty.")
replacements = {
"+": "1",
"\u013e": "2",
"\u0161": "3",
"\u010d": "4",
"\u0165": "5",
"\u017e": "6",
"\u00fd": "7",
"\u00e1": "8",
"\u00ed": "9",
"\u00e9": "0",
"!": "1",
"@": "2",
"#": "3",
"$": "4",
"%": "5",
"^": "6",
"&": "7",
"*": "8",
"(": "9",
")": "0",
}
for src, dst in replacements.items():
code = code.replace(src, dst)
try:
room_code = str(int(code[30:34]))
except Exception as e:
raise FidelioDbError("Neznamy format hotelovej karty.") from e
try:
guest_id = str(int(code[10:20]))
except Exception:
guest_id = ""
try:
checkout = datetime.strptime(code[20:30], "%y%m%d%H%M")
except Exception as e:
raise FidelioDbError("Neznamy format hotelovej karty.") from e
if checkout < datetime.now():
raise FidelioDbError("Nacitana hotelova karta je neplatna.")
return room_code, guest_id
def _normalize_salto_card(card_code: str) -> str:
code = _s(card_code)
normalized = ""
while code:
if len(code) > 1:
normalized += code[-2:]
code = code[:-2]
else:
normalized += code
code = ""
return normalized.ljust(14, "0")
def check_card(
conn: data.PostgresConnection,
id_kas: str,
params: dict,
card_code: str,
timeout_seconds: int = 30,
) -> data.HotelCardResult:
params = params or {}
card_type = _s(params.get("hotel_karta_typ") or "SALTO").strip("\"'").upper()
length = _i(params.get("hotel_karta_length"), 14)
guest_id = ""
if card_type == "SALTO":
room_code = ""
track2 = _normalize_salto_card(card_code)
elif card_type == "MAG":
room_code, guest_id = _normalize_mag_card(card_code)
track2 = room_code
else:
room_code = ""
track2 = _s(card_code).ljust(length, "0") if length else _s(card_code)
guests = load_guests(
conn,
id_kas=id_kas,
params=params,
room_code=room_code,
track2=track2,
timeout_seconds=timeout_seconds,
)
if guest_id:
guests = [guest for guest in guests if _s(guest.id) == guest_id] or guests
if not guests:
raise FidelioDbError("Ku karte sa nenasiel ziadny host.")
guest = guests[0]
return data.HotelCardResult(
room_id=guest.room_id,
room_code=guest.room_code,
account_id=guest.account_id or guest.id,
guest_id=guest.id,
guest_name=guest.guest_name,
)
def _raster_index(value) -> int:
idx = _i(value, 1)
if idx < 1 or idx > 9:
return 1
return idx
def _charge_amounts(preparation: data.HotelChargePreparation) -> list[Decimal]:
amounts = [Decimal("0.00") for _ in range(9)]
for line in preparation.lines or []:
idx = _raster_index(line.raster_id)
amounts[idx - 1] += _money(line.amount)
return [amount.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) for amount in amounts]
def charge_account(
conn: data.PostgresConnection,
id_kas: str,
preparation: data.HotelChargePreparation,
timeout_seconds: int = 30,
) -> dict:
if not preparation or not preparation.ready:
raise FidelioDbError("Hotelovy ucet nie je pripraveny na odoslanie.")
target = preparation.target
if not target:
raise FidelioDbError("Hotelovy ucet nema ciel.")
amounts = _charge_amounts(preparation)
total = sum(amounts, Decimal("0.00")).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
if not total:
raise FidelioDbError("Hotelovy ucet nema ziadnu sumu na odoslanie.")
payload = _db_out_defaults()
payload.update({
"typ": "ZATIZ",
"cis_kasy": _s(id_kas),
"outlet": _s(id_kas),
"pokoj": _s(target.room_code),
"serv_per": _s(target.time_attribute)[:1],
"gn": _s(target.guest_name),
"folio": _s(target.guest_id or target.account_id),
"ucet_kasa": _s(preparation.receipt_number)[-7:],
"total": total,
})
for idx, amount in enumerate(amounts, start=1):
payload[f"s{idx}"] = amount
with postgres_service.connect(conn) as pg:
postseqnr = _insert_db_out(pg, conn, payload)
logger.info(
"Fidelio ZATIZ inserted: postseqnr=%s id_kas=%s pokoj=%s folio=%s "
"ucet_kasa=%s total=%s amounts=%s",
postseqnr,
_s(id_kas),
payload["pokoj"],
payload["folio"],
payload["ucet_kasa"],
total,
[str(x) for x in amounts],
)
answer = _wait_for_answer(pg, conn, postseqnr, timeout_seconds=timeout_seconds)
if not answer:
_mark_request_done(pg, conn, postseqnr)
raise FidelioDbError("Opera neodpoveda.")
_mark_request_done(pg, conn, postseqnr, _i(answer.get("idriadok")))
if not _is_ok_answer(answer):
raise FidelioDbError(_s(answer.get("kecy")) or "Opera vratila chybu.")
return {
"ok": True,
"request_number": postseqnr,
"message": "OK",
}