403 lines
12 KiB
Python
403 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import time
|
|
import logging
|
|
from datetime import datetime
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
|
|
import data
|
|
import postgres_service
|
|
|
|
|
|
class FidelioDbError(Exception):
|
|
pass
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _s(value) -> str:
|
|
return "" if value is None else str(value).strip()
|
|
|
|
|
|
def _i(value, default: int = 0) -> int:
|
|
try:
|
|
return int(float(str(value).strip().strip("\"'")))
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _money(value) -> Decimal:
|
|
try:
|
|
return Decimal(str(value).replace(",", ".")).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
|
|
except Exception:
|
|
return Decimal("0.00")
|
|
|
|
|
|
def _quote_ident(name: str) -> str:
|
|
name = _s(name)
|
|
if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name):
|
|
raise FidelioDbError(f"Neplatny PostgreSQL identifikator: {name}")
|
|
return f'"{name}"'
|
|
|
|
|
|
def _table(conn: data.PostgresConnection, table_name: str) -> str:
|
|
schema = _s(getattr(conn, "schema_", "")) or "food600"
|
|
return f"{_quote_ident(schema)}.{_quote_ident(table_name)}"
|
|
|
|
|
|
def _now_parts():
|
|
now = datetime.now()
|
|
return now, now.strftime("%y%m%d"), now.strftime("%H%M%S")
|
|
|
|
|
|
def _db_out_defaults() -> dict:
|
|
now, dat, cas = _now_parts()
|
|
payload = {
|
|
"typ": "",
|
|
"stav": "I",
|
|
"cis_kasy": "",
|
|
"outlet": "",
|
|
"porad_tiz": "",
|
|
"pokoj": "",
|
|
"datum": now.date(),
|
|
"sekund": Decimal("0.000"),
|
|
"dat": dat,
|
|
"cas": cas,
|
|
"ucet_kasa": "",
|
|
"serv_per": "",
|
|
"folio": "",
|
|
"gn": "",
|
|
"p_hostu": "",
|
|
"total": Decimal("0.00"),
|
|
"dysko": Decimal("0.00"),
|
|
"pm": "",
|
|
"ckar": "",
|
|
"track2": "",
|
|
"retrans": Decimal("0"),
|
|
}
|
|
for idx in range(1, 10):
|
|
payload[f"s{idx}"] = Decimal("0.00")
|
|
payload[f"d{idx}"] = Decimal("0.00")
|
|
payload[f"t{idx}"] = Decimal("0.00")
|
|
return payload
|
|
|
|
|
|
def _insert_db_out(pg, conn: data.PostgresConnection, payload: dict) -> int:
|
|
table = _table(conn, "db_out")
|
|
columns = [
|
|
"typ", "stav", "cis_kasy", "outlet", "porad_tiz", "pokoj",
|
|
"datum", "sekund", "dat", "cas", "ucet_kasa", "serv_per",
|
|
"folio", "gn", "p_hostu", "total",
|
|
"s1", "d1", "t1", "s2", "d2", "t2", "s3", "d3", "t3",
|
|
"s4", "d4", "t4", "s5", "d5", "t5", "s6", "d6", "t6",
|
|
"s7", "d7", "t7", "s8", "d8", "t8", "s9", "d9", "t9",
|
|
"dysko", "pm", "ckar", "track2", "retrans",
|
|
]
|
|
placeholders = ", ".join(["%s"] * len(columns))
|
|
column_sql = ", ".join(_quote_ident(col) for col in columns)
|
|
values = [payload.get(col) for col in columns]
|
|
cur = pg.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"INSERT INTO {table} ({column_sql}) VALUES ({placeholders}) RETURNING postseqnr",
|
|
values,
|
|
)
|
|
row = cur.fetchone()
|
|
pg.commit()
|
|
finally:
|
|
cur.close()
|
|
return int(row[0])
|
|
|
|
|
|
def _mark_request_done(pg, conn: data.PostgresConnection, postseqnr: int, db_in_id: int | None = None) -> None:
|
|
cur = pg.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"UPDATE {_table(conn, 'db_out')} SET stav=%s WHERE postseqnr=%s",
|
|
("O", postseqnr),
|
|
)
|
|
if db_in_id is not None:
|
|
cur.execute(
|
|
f"UPDATE {_table(conn, 'db_in')} SET stav=%s WHERE idriadok=%s",
|
|
("O", db_in_id),
|
|
)
|
|
pg.commit()
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _wait_for_answer(pg, conn: data.PostgresConnection, postseqnr: int, timeout_seconds: int = 30) -> dict | None:
|
|
table = _table(conn, "db_in")
|
|
columns = [
|
|
"idriadok", "odpoved", "kecy", "pokoj1",
|
|
"folio1", "gn1", "pokoj2", "folio2", "gn2",
|
|
"pokoj3", "folio3", "gn3", "pokoj4", "folio4", "gn4",
|
|
]
|
|
column_sql = ", ".join(_quote_ident(col) for col in columns)
|
|
deadline = time.time() + max(1, int(timeout_seconds or 30))
|
|
while time.time() < deadline:
|
|
cur = pg.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"SELECT {column_sql} FROM {table} WHERE postseqnr=%s ORDER BY idriadok LIMIT 1",
|
|
(postseqnr,),
|
|
)
|
|
row = cur.fetchone()
|
|
finally:
|
|
cur.close()
|
|
if row:
|
|
return dict(zip(columns, row))
|
|
time.sleep(1)
|
|
return None
|
|
|
|
|
|
def _is_ok_answer(answer: dict) -> bool:
|
|
value = _s(answer.get("odpoved")).upper()
|
|
return value in {"", "OK", "UR"}
|
|
|
|
|
|
def _guest_from_answer(answer: dict, idx: int) -> data.HotelGuest | None:
|
|
folio = _s(answer.get(f"folio{idx}"))
|
|
if not folio:
|
|
return None
|
|
room_code = _s(answer.get(f"pokoj{idx}")) or _s(answer.get("pokoj1"))
|
|
return data.HotelGuest(
|
|
id=folio,
|
|
account_id=folio,
|
|
guest_name=_s(answer.get(f"gn{idx}")),
|
|
room_id=room_code,
|
|
room_code=room_code,
|
|
result=0,
|
|
)
|
|
|
|
|
|
def load_guests(
|
|
conn: data.PostgresConnection,
|
|
id_kas: str,
|
|
params: dict,
|
|
room_code: str = "",
|
|
track2: str = "",
|
|
timeout_seconds: int = 30,
|
|
) -> list[data.HotelGuest]:
|
|
width = _i((params or {}).get("fidelio_izba_znakov"), 3)
|
|
room_code = _s(room_code)
|
|
track2 = _s(track2)
|
|
if len(room_code) >= width:
|
|
width = len(room_code)
|
|
track2 = ""
|
|
elif not room_code:
|
|
width = 0
|
|
|
|
payload = _db_out_defaults()
|
|
payload.update({
|
|
"typ": "DOTAZ",
|
|
"cis_kasy": _s(id_kas),
|
|
"outlet": _s(id_kas),
|
|
"pokoj": room_code.rjust(width, "0"),
|
|
"track2": track2,
|
|
})
|
|
|
|
with postgres_service.connect(conn) as pg:
|
|
postseqnr = _insert_db_out(pg, conn, payload)
|
|
answer = _wait_for_answer(pg, conn, postseqnr, timeout_seconds=timeout_seconds)
|
|
if not answer:
|
|
_mark_request_done(pg, conn, postseqnr)
|
|
raise FidelioDbError("Opera neodpoveda.")
|
|
_mark_request_done(pg, conn, postseqnr, _i(answer.get("idriadok")))
|
|
|
|
if not _is_ok_answer(answer):
|
|
raise FidelioDbError(_s(answer.get("kecy")) or "Opera vratila chybu.")
|
|
|
|
guests = []
|
|
for idx in range(1, 5):
|
|
guest = _guest_from_answer(answer, idx)
|
|
if guest:
|
|
guests.append(guest)
|
|
return guests
|
|
|
|
|
|
def _normalize_mag_card(card_code: str) -> tuple[str, str]:
|
|
code = _s(card_code)
|
|
if code.startswith("%") and "?" in code:
|
|
code = code[1:code.find("?")]
|
|
elif code.startswith("5") and "<" in code:
|
|
code = code[1:code.find("<")]
|
|
elif code.startswith("_") and "<" in code:
|
|
code = code[1:code.find("<")]
|
|
else:
|
|
raise FidelioDbError("Neznamy format hotelovej karty.")
|
|
|
|
replacements = {
|
|
"+": "1",
|
|
"\u013e": "2",
|
|
"\u0161": "3",
|
|
"\u010d": "4",
|
|
"\u0165": "5",
|
|
"\u017e": "6",
|
|
"\u00fd": "7",
|
|
"\u00e1": "8",
|
|
"\u00ed": "9",
|
|
"\u00e9": "0",
|
|
"!": "1",
|
|
"@": "2",
|
|
"#": "3",
|
|
"$": "4",
|
|
"%": "5",
|
|
"^": "6",
|
|
"&": "7",
|
|
"*": "8",
|
|
"(": "9",
|
|
")": "0",
|
|
}
|
|
for src, dst in replacements.items():
|
|
code = code.replace(src, dst)
|
|
try:
|
|
room_code = str(int(code[30:34]))
|
|
except Exception as e:
|
|
raise FidelioDbError("Neznamy format hotelovej karty.") from e
|
|
try:
|
|
guest_id = str(int(code[10:20]))
|
|
except Exception:
|
|
guest_id = ""
|
|
try:
|
|
checkout = datetime.strptime(code[20:30], "%y%m%d%H%M")
|
|
except Exception as e:
|
|
raise FidelioDbError("Neznamy format hotelovej karty.") from e
|
|
if checkout < datetime.now():
|
|
raise FidelioDbError("Nacitana hotelova karta je neplatna.")
|
|
return room_code, guest_id
|
|
|
|
|
|
def _normalize_salto_card(card_code: str) -> str:
|
|
code = _s(card_code)
|
|
normalized = ""
|
|
while code:
|
|
if len(code) > 1:
|
|
normalized += code[-2:]
|
|
code = code[:-2]
|
|
else:
|
|
normalized += code
|
|
code = ""
|
|
return normalized.ljust(14, "0")
|
|
|
|
|
|
def check_card(
|
|
conn: data.PostgresConnection,
|
|
id_kas: str,
|
|
params: dict,
|
|
card_code: str,
|
|
timeout_seconds: int = 30,
|
|
) -> data.HotelCardResult:
|
|
params = params or {}
|
|
card_type = _s(params.get("hotel_karta_typ") or "SALTO").strip("\"'").upper()
|
|
length = _i(params.get("hotel_karta_length"), 14)
|
|
guest_id = ""
|
|
if card_type == "SALTO":
|
|
room_code = ""
|
|
track2 = _normalize_salto_card(card_code)
|
|
elif card_type == "MAG":
|
|
room_code, guest_id = _normalize_mag_card(card_code)
|
|
track2 = room_code
|
|
else:
|
|
room_code = ""
|
|
track2 = _s(card_code).ljust(length, "0") if length else _s(card_code)
|
|
|
|
guests = load_guests(
|
|
conn,
|
|
id_kas=id_kas,
|
|
params=params,
|
|
room_code=room_code,
|
|
track2=track2,
|
|
timeout_seconds=timeout_seconds,
|
|
)
|
|
if guest_id:
|
|
guests = [guest for guest in guests if _s(guest.id) == guest_id] or guests
|
|
if not guests:
|
|
raise FidelioDbError("Ku karte sa nenasiel ziadny host.")
|
|
guest = guests[0]
|
|
return data.HotelCardResult(
|
|
room_id=guest.room_id,
|
|
room_code=guest.room_code,
|
|
account_id=guest.account_id or guest.id,
|
|
guest_id=guest.id,
|
|
guest_name=guest.guest_name,
|
|
)
|
|
|
|
|
|
def _raster_index(value) -> int:
|
|
idx = _i(value, 1)
|
|
if idx < 1 or idx > 9:
|
|
return 1
|
|
return idx
|
|
|
|
|
|
def _charge_amounts(preparation: data.HotelChargePreparation) -> list[Decimal]:
|
|
amounts = [Decimal("0.00") for _ in range(9)]
|
|
for line in preparation.lines or []:
|
|
idx = _raster_index(line.raster_id)
|
|
amounts[idx - 1] += _money(line.amount)
|
|
return [amount.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) for amount in amounts]
|
|
|
|
|
|
def charge_account(
|
|
conn: data.PostgresConnection,
|
|
id_kas: str,
|
|
preparation: data.HotelChargePreparation,
|
|
timeout_seconds: int = 30,
|
|
) -> dict:
|
|
if not preparation or not preparation.ready:
|
|
raise FidelioDbError("Hotelovy ucet nie je pripraveny na odoslanie.")
|
|
target = preparation.target
|
|
if not target:
|
|
raise FidelioDbError("Hotelovy ucet nema ciel.")
|
|
|
|
amounts = _charge_amounts(preparation)
|
|
total = sum(amounts, Decimal("0.00")).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
|
|
if not total:
|
|
raise FidelioDbError("Hotelovy ucet nema ziadnu sumu na odoslanie.")
|
|
|
|
payload = _db_out_defaults()
|
|
payload.update({
|
|
"typ": "ZATIZ",
|
|
"cis_kasy": _s(id_kas),
|
|
"outlet": _s(id_kas),
|
|
"pokoj": _s(target.room_code),
|
|
"serv_per": _s(target.time_attribute)[:1],
|
|
"gn": _s(target.guest_name),
|
|
"folio": _s(target.guest_id or target.account_id),
|
|
"ucet_kasa": _s(preparation.receipt_number)[-7:],
|
|
"total": total,
|
|
})
|
|
for idx, amount in enumerate(amounts, start=1):
|
|
payload[f"s{idx}"] = amount
|
|
|
|
with postgres_service.connect(conn) as pg:
|
|
postseqnr = _insert_db_out(pg, conn, payload)
|
|
logger.info(
|
|
"Fidelio ZATIZ inserted: postseqnr=%s id_kas=%s pokoj=%s folio=%s "
|
|
"ucet_kasa=%s total=%s amounts=%s",
|
|
postseqnr,
|
|
_s(id_kas),
|
|
payload["pokoj"],
|
|
payload["folio"],
|
|
payload["ucet_kasa"],
|
|
total,
|
|
[str(x) for x in amounts],
|
|
)
|
|
answer = _wait_for_answer(pg, conn, postseqnr, timeout_seconds=timeout_seconds)
|
|
if not answer:
|
|
_mark_request_done(pg, conn, postseqnr)
|
|
raise FidelioDbError("Opera neodpoveda.")
|
|
_mark_request_done(pg, conn, postseqnr, _i(answer.get("idriadok")))
|
|
|
|
if not _is_ok_answer(answer):
|
|
raise FidelioDbError(_s(answer.get("kecy")) or "Opera vratila chybu.")
|
|
|
|
return {
|
|
"ok": True,
|
|
"request_number": postseqnr,
|
|
"message": "OK",
|
|
}
|