from __future__ import annotations import re import time import logging from datetime import datetime from decimal import Decimal, ROUND_HALF_UP import data import postgres_service class FidelioDbError(Exception): pass logger = logging.getLogger(__name__) def _s(value) -> str: return "" if value is None else str(value).strip() def _i(value, default: int = 0) -> int: try: return int(float(str(value).strip().strip("\"'"))) except Exception: return default def _money(value) -> Decimal: try: return Decimal(str(value).replace(",", ".")).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) except Exception: return Decimal("0.00") def _quote_ident(name: str) -> str: name = _s(name) if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name): raise FidelioDbError(f"Neplatny PostgreSQL identifikator: {name}") return f'"{name}"' def _table(conn: data.PostgresConnection, table_name: str) -> str: schema = _s(getattr(conn, "schema_", "")) or "food600" return f"{_quote_ident(schema)}.{_quote_ident(table_name)}" def _now_parts(): now = datetime.now() return now, now.strftime("%y%m%d"), now.strftime("%H%M%S") def _db_out_defaults() -> dict: now, dat, cas = _now_parts() payload = { "typ": "", "stav": "I", "cis_kasy": "", "outlet": "", "porad_tiz": "", "pokoj": "", "datum": now.date(), "sekund": Decimal("0.000"), "dat": dat, "cas": cas, "ucet_kasa": "", "serv_per": "", "folio": "", "gn": "", "p_hostu": "", "total": Decimal("0.00"), "dysko": Decimal("0.00"), "pm": "", "ckar": "", "track2": "", "retrans": Decimal("0"), } for idx in range(1, 10): payload[f"s{idx}"] = Decimal("0.00") payload[f"d{idx}"] = Decimal("0.00") payload[f"t{idx}"] = Decimal("0.00") return payload def _insert_db_out(pg, conn: data.PostgresConnection, payload: dict) -> int: table = _table(conn, "db_out") columns = [ "typ", "stav", "cis_kasy", "outlet", "porad_tiz", "pokoj", "datum", "sekund", "dat", "cas", "ucet_kasa", "serv_per", "folio", "gn", "p_hostu", "total", "s1", "d1", "t1", "s2", "d2", "t2", "s3", "d3", "t3", "s4", "d4", "t4", "s5", "d5", "t5", "s6", "d6", "t6", "s7", "d7", "t7", "s8", "d8", "t8", "s9", "d9", "t9", "dysko", "pm", "ckar", "track2", "retrans", ] placeholders = ", ".join(["%s"] * len(columns)) column_sql = ", ".join(_quote_ident(col) for col in columns) values = [payload.get(col) for col in columns] cur = pg.cursor() try: cur.execute( f"INSERT INTO {table} ({column_sql}) VALUES ({placeholders}) RETURNING postseqnr", values, ) row = cur.fetchone() pg.commit() finally: cur.close() return int(row[0]) def _mark_request_done(pg, conn: data.PostgresConnection, postseqnr: int, db_in_id: int | None = None) -> None: cur = pg.cursor() try: cur.execute( f"UPDATE {_table(conn, 'db_out')} SET stav=%s WHERE postseqnr=%s", ("O", postseqnr), ) if db_in_id is not None: cur.execute( f"UPDATE {_table(conn, 'db_in')} SET stav=%s WHERE idriadok=%s", ("O", db_in_id), ) pg.commit() finally: cur.close() def _wait_for_answer(pg, conn: data.PostgresConnection, postseqnr: int, timeout_seconds: int = 30) -> dict | None: table = _table(conn, "db_in") columns = [ "idriadok", "odpoved", "kecy", "pokoj1", "folio1", "gn1", "pokoj2", "folio2", "gn2", "pokoj3", "folio3", "gn3", "pokoj4", "folio4", "gn4", ] column_sql = ", ".join(_quote_ident(col) for col in columns) deadline = time.time() + max(1, int(timeout_seconds or 30)) while time.time() < deadline: cur = pg.cursor() try: cur.execute( f"SELECT {column_sql} FROM {table} WHERE postseqnr=%s ORDER BY idriadok LIMIT 1", (postseqnr,), ) row = cur.fetchone() finally: cur.close() if row: return dict(zip(columns, row)) time.sleep(1) return None def _is_ok_answer(answer: dict) -> bool: value = _s(answer.get("odpoved")).upper() return value in {"", "OK", "UR"} def _guest_from_answer(answer: dict, idx: int) -> data.HotelGuest | None: folio = _s(answer.get(f"folio{idx}")) if not folio: return None room_code = _s(answer.get(f"pokoj{idx}")) or _s(answer.get("pokoj1")) return data.HotelGuest( id=folio, account_id=folio, guest_name=_s(answer.get(f"gn{idx}")), room_id=room_code, room_code=room_code, result=0, ) def load_guests( conn: data.PostgresConnection, id_kas: str, params: dict, room_code: str = "", track2: str = "", timeout_seconds: int = 30, ) -> list[data.HotelGuest]: width = _i((params or {}).get("fidelio_izba_znakov"), 3) room_code = _s(room_code) track2 = _s(track2) if len(room_code) >= width: width = len(room_code) track2 = "" elif not room_code: width = 0 payload = _db_out_defaults() payload.update({ "typ": "DOTAZ", "cis_kasy": _s(id_kas), "outlet": _s(id_kas), "pokoj": room_code.rjust(width, "0"), "track2": track2, }) with postgres_service.connect(conn) as pg: postseqnr = _insert_db_out(pg, conn, payload) answer = _wait_for_answer(pg, conn, postseqnr, timeout_seconds=timeout_seconds) if not answer: _mark_request_done(pg, conn, postseqnr) raise FidelioDbError("Opera neodpoveda.") _mark_request_done(pg, conn, postseqnr, _i(answer.get("idriadok"))) if not _is_ok_answer(answer): raise FidelioDbError(_s(answer.get("kecy")) or "Opera vratila chybu.") guests = [] for idx in range(1, 5): guest = _guest_from_answer(answer, idx) if guest: guests.append(guest) return guests def _normalize_mag_card(card_code: str) -> tuple[str, str]: code = _s(card_code) if code.startswith("%") and "?" in code: code = code[1:code.find("?")] elif code.startswith("5") and "<" in code: code = code[1:code.find("<")] elif code.startswith("_") and "<" in code: code = code[1:code.find("<")] else: raise FidelioDbError("Neznamy format hotelovej karty.") replacements = { "+": "1", "\u013e": "2", "\u0161": "3", "\u010d": "4", "\u0165": "5", "\u017e": "6", "\u00fd": "7", "\u00e1": "8", "\u00ed": "9", "\u00e9": "0", "!": "1", "@": "2", "#": "3", "$": "4", "%": "5", "^": "6", "&": "7", "*": "8", "(": "9", ")": "0", } for src, dst in replacements.items(): code = code.replace(src, dst) try: room_code = str(int(code[30:34])) except Exception as e: raise FidelioDbError("Neznamy format hotelovej karty.") from e try: guest_id = str(int(code[10:20])) except Exception: guest_id = "" try: checkout = datetime.strptime(code[20:30], "%y%m%d%H%M") except Exception as e: raise FidelioDbError("Neznamy format hotelovej karty.") from e if checkout < datetime.now(): raise FidelioDbError("Nacitana hotelova karta je neplatna.") return room_code, guest_id def _normalize_salto_card(card_code: str) -> str: code = _s(card_code) normalized = "" while code: if len(code) > 1: normalized += code[-2:] code = code[:-2] else: normalized += code code = "" return normalized.ljust(14, "0") def check_card( conn: data.PostgresConnection, id_kas: str, params: dict, card_code: str, timeout_seconds: int = 30, ) -> data.HotelCardResult: params = params or {} card_type = _s(params.get("hotel_karta_typ") or "SALTO").strip("\"'").upper() length = _i(params.get("hotel_karta_length"), 14) guest_id = "" if card_type == "SALTO": room_code = "" track2 = _normalize_salto_card(card_code) elif card_type == "MAG": room_code, guest_id = _normalize_mag_card(card_code) track2 = room_code else: room_code = "" track2 = _s(card_code).ljust(length, "0") if length else _s(card_code) guests = load_guests( conn, id_kas=id_kas, params=params, room_code=room_code, track2=track2, timeout_seconds=timeout_seconds, ) if guest_id: guests = [guest for guest in guests if _s(guest.id) == guest_id] or guests if not guests: raise FidelioDbError("Ku karte sa nenasiel ziadny host.") guest = guests[0] return data.HotelCardResult( room_id=guest.room_id, room_code=guest.room_code, account_id=guest.account_id or guest.id, guest_id=guest.id, guest_name=guest.guest_name, ) def _raster_index(value) -> int: idx = _i(value, 1) if idx < 1 or idx > 9: return 1 return idx def _charge_amounts(preparation: data.HotelChargePreparation) -> list[Decimal]: amounts = [Decimal("0.00") for _ in range(9)] for line in preparation.lines or []: idx = _raster_index(line.raster_id) amounts[idx - 1] += _money(line.amount) return [amount.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) for amount in amounts] def charge_account( conn: data.PostgresConnection, id_kas: str, preparation: data.HotelChargePreparation, timeout_seconds: int = 30, ) -> dict: if not preparation or not preparation.ready: raise FidelioDbError("Hotelovy ucet nie je pripraveny na odoslanie.") target = preparation.target if not target: raise FidelioDbError("Hotelovy ucet nema ciel.") amounts = _charge_amounts(preparation) total = sum(amounts, Decimal("0.00")).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) if not total: raise FidelioDbError("Hotelovy ucet nema ziadnu sumu na odoslanie.") payload = _db_out_defaults() payload.update({ "typ": "ZATIZ", "cis_kasy": _s(id_kas), "outlet": _s(id_kas), "pokoj": _s(target.room_code), "serv_per": _s(target.time_attribute)[:1], "gn": _s(target.guest_name), "folio": _s(target.guest_id or target.account_id), "ucet_kasa": _s(preparation.receipt_number)[-7:], "total": total, }) for idx, amount in enumerate(amounts, start=1): payload[f"s{idx}"] = amount with postgres_service.connect(conn) as pg: postseqnr = _insert_db_out(pg, conn, payload) logger.info( "Fidelio ZATIZ inserted: postseqnr=%s id_kas=%s pokoj=%s folio=%s " "ucet_kasa=%s total=%s amounts=%s", postseqnr, _s(id_kas), payload["pokoj"], payload["folio"], payload["ucet_kasa"], total, [str(x) for x in amounts], ) answer = _wait_for_answer(pg, conn, postseqnr, timeout_seconds=timeout_seconds) if not answer: _mark_request_done(pg, conn, postseqnr) raise FidelioDbError("Opera neodpoveda.") _mark_request_done(pg, conn, postseqnr, _i(answer.get("idriadok"))) if not _is_ok_answer(answer): raise FidelioDbError(_s(answer.get("kecy")) or "Opera vratila chybu.") return { "ok": True, "request_number": postseqnr, "message": "OK", }