from collections import defaultdict from datetime import datetime from typing import Any import json import sqlite3 from data import ( ClosureInterval, ClosureSummary, ClosureUser, ClosureVAT, ClosureReportOut, ) def _strip(value: Any) -> str: return "" if value is None else str(value).strip() def _float(value: Any, default: float = 0.0) -> float: try: return float(str(value).strip().replace(",", ".")) except Exception: return default def _int(value: Any, default: int = 0) -> int: try: return int(float(str(value).strip().replace(",", "."))) except Exception: return default def _money(value: Any) -> float: return round(_float(value, 0.0), 2) def _json_loads(raw: str | bytes | None) -> dict: if not raw: return {} try: data = json.loads(raw) except Exception: return {} return data if isinstance(data, dict) else {} def _prefix_from_table(table_name: str) -> str: return table_name[:-5] if table_name.endswith("_ucty") else table_name.rsplit("_", 1)[0] def _load_payment_meta(cur, table_ucty: str, id_kas: str) -> dict[str, dict[str, Any]]: prefix = _prefix_from_table(table_ucty) table = f"{prefix}_platby" try: cur.execute( f""" SELECT code, name, unit, fiscal, is_cash, is_bankterm, odvod, odovzdat FROM "{table}" WHERE id_kas=? """, (id_kas,), ) except sqlite3.Error: return {} result: dict[str, dict[str, Any]] = {} for code, name, unit, fiscal, is_cash, is_bankterm, odvod, odovzdat in cur.fetchall(): key = _strip(code) if not key: continue result[key] = { "code": key, "name": _strip(name), "unit": _strip(unit), "fiscal": int(fiscal or 0), "is_cash": int(is_cash or 0), "is_bankterm": int(is_bankterm or 0), "odvod": int(odvod or 0), "odovzdat": _strip(odovzdat), } return result def _load_fooddat_names(cur, table_ucty: str) -> dict[str, str]: prefix = _prefix_from_table(table_ucty) table = f"{prefix}_fooddat" try: cur.execute(f'SELECT id, id_zkratka FROM "{table}"') except sqlite3.Error: return {} result: dict[str, str] = {} for raw_id, raw_name in cur.fetchall(): key = _strip(raw_id) name = _strip(raw_name) if not key or not name: continue result[key] = name try: numeric = str(int(float(key))) result.setdefault(numeric, name) result.setdefault(numeric.zfill(2), name) except Exception: pass return result def _load_previous_cash_carry(cur, table_ucty: str, id_kas: str) -> dict[tuple[str, str], float]: prefix = _prefix_from_table(table_ucty) table = f"{prefix}_closure_cash_state" try: cur.execute( f""" SELECT clsrep_id, prn_no, payment_code, carry_amount FROM "{table}" WHERE id_kas=? ORDER BY clsrep_id, id """, (id_kas,), ) except sqlite3.Error: return {} result: dict[tuple[str, str], float] = {} for _clsrep_id, prn_no, payment_code, carry_amount in cur.fetchall(): result[(_strip(prn_no), _strip(payment_code))] = _money(carry_amount) return result def _payment_code(payment: dict) -> str: return _strip(payment.get("code") or payment.get("nazev_platby") or payment.get("name") or "UNKNOWN") def _payment_name(payment: dict, payment_meta: dict[str, dict[str, Any]]) -> str: code = _payment_code(payment) meta = payment_meta.get(code) or {} return _strip(payment.get("nazev") or payment.get("name") or meta.get("name") or code) def _payment_amount(payment: dict) -> float: if payment.get("suma_czk") not in (None, ""): return _money(payment.get("suma_czk")) amount = _float(payment.get("suma"), 0.0) rate = _float(payment.get("rate"), 1.0) or 1.0 return _money(amount * rate) def _payment_tip(payment: dict) -> float: tip = _float(payment.get("tip"), 0.0) rate = _float(payment.get("rate"), 1.0) or 1.0 return _money(tip * rate) def _receipt_printer(receipt: dict) -> str: return _strip(receipt.get("bill_printer") or receipt.get("printer_no") or receipt.get("prn_no") or "") def _cash_operation(receipt: dict) -> str: return _strip(receipt.get("cash_operation")).lower() def _is_deposit(op: str) -> bool: return op in {"manual_deposit", "auto_deposit", "deposit", "vklad"} def _is_withdrawal(op: str) -> bool: return op in {"manual_withdrawal", "auto_withdrawal", "withdrawal", "withdraw", "vyber", "vyber"} def _vat_tax_amount(rate_text: str, zaklad: float) -> float: try: rate = float(rate_text) except Exception: return 0.0 if rate == -1: return 0.0 if rate > 2.0: return zaklad * (rate / 100.0) return zaklad * (rate - 1.0) def _open_total_from_poloz(receipt: dict) -> float: total = 0.0 for item in receipt.get("poloz") or []: cena = _float(item.get("cena"), 0.0) pocet = _float(item.get("pocet"), 0.0) delitel = _float(item.get("delitel"), 1.0) or 1.0 total += cena * (pocet / delitel) return _money(total) def _item_qty(item: dict) -> float: delitel = _float(item.get("delitel"), 1.0) or 1.0 return _float(item.get("pocet"), 0.0) / delitel def _item_amount(item: dict) -> float: return _money(_float(item.get("cena"), 0.0) * _item_qty(item)) def _item_original_amount(item: dict) -> float: price = item.get("cena_puv") if price in (None, ""): price = item.get("cena") return _money(_float(price, 0.0) * _item_qty(item)) def _receipt_total(receipt: dict) -> float: total = _float(receipt.get("total_base_currency"), 0.0) if abs(total) < 0.0001: total = _open_total_from_poloz(receipt) return _money(total) def _is_storno_receipt(receipt: dict) -> bool: origin = _strip(receipt.get("origin")).lower() return ( bool(receipt.get("is_storno")) or "storno" in origin or _receipt_total(receipt) < -0.004 ) def _payment_is_fiscal(payment: dict, payment_meta: dict[str, dict[str, Any]]) -> bool: code = _payment_code(payment) meta = payment_meta.get(code) or {} if "fiscal" in payment: return bool(payment.get("fiscal")) return int(meta.get("fiscal", 0) or 0) != 0 def _payment_is_bankterm(payment: dict, payment_meta: dict[str, dict[str, Any]]) -> bool: code = _payment_code(payment) meta = payment_meta.get(code) or {} return bool(payment.get("is_bankterm")) or int(meta.get("is_bankterm", 0) or 0) != 0 def _operation_label(operation: str) -> str: operation = _strip(operation).lower() if _is_deposit(operation): return "Vklad" if _is_withdrawal(operation): return "Vyber" return operation or "-" def _sort_rows(rows: dict[str, dict[str, Any]], *amount_fields: str) -> list[dict[str, Any]]: result = [] for row in rows.values(): item = dict(row) for field in amount_fields: item[field] = _money(item.get(field, 0.0)) if "qty" in item: item["qty"] = round(_float(item.get("qty"), 0.0), 4) result.append(item) return sorted( result, key=lambda row: ( _strip(row.get("name") or row.get("label") or row.get("code")), _strip(row.get("code")), ), ) def _build_report_sections( cur, table_ucty: str, id_kas: str, receipts: list[dict[str, Any]], ) -> dict[str, Any]: payment_meta = _load_payment_meta(cur, table_ucty, id_kas) items_by_kind: dict[str, dict[str, Any]] = {} items_sold: dict[str, dict[str, Any]] = {} price_levels: dict[str, dict[str, Any]] = {} storages: dict[str, dict[str, Any]] = {} rooms: dict[str, dict[str, Any]] = {} tables: dict[str, dict[str, Any]] = {} receipt_counts_by_user: dict[str, dict[str, Any]] = {} payments_by_user: dict[str, dict[str, Any]] = {} payments_by_code: dict[str, dict[str, Any]] = {} payments_by_vat: dict[str, dict[str, Any]] = {} fiscal_payments_by_vat: dict[str, dict[str, Any]] = {} fiscal_payments: dict[str, dict[str, Any]] = {} nonfiscal_payments: dict[str, dict[str, Any]] = {} terminal_payments: dict[str, dict[str, Any]] = {} receivables_by_payment: dict[str, dict[str, Any]] = {} managers: dict[str, dict[str, Any]] = {} managers_by_vat: dict[str, dict[str, Any]] = {} managers_payments_by_vat: dict[str, dict[str, Any]] = {} cashiers_by_payment: dict[str, dict[str, Any]] = {} cashiers_cash: dict[str, dict[str, Any]] = {} sparts: dict[str, dict[str, Any]] = {} cash_operations_summary: dict[str, dict[str, Any]] = {} currency_payments: dict[str, dict[str, Any]] = {} receipt_list: list[dict[str, Any]] = [] storno_journal: list[dict[str, Any]] = [] cash_operations: list[dict[str, Any]] = [] fooddat_names = _load_fooddat_names(cur, table_ucty) def add_item_row( rows: dict[str, dict[str, Any]], key: str, code: str, name: str, qty: float, amount: float, amount_original: float | None = None, **extra, ): if key not in rows: rows[key] = { "code": code, "name": name or code or "-", "qty": 0.0, "amount": 0.0, "amount_original": 0.0, "count": 0, **extra, } else: for extra_key, extra_value in extra.items(): if not rows[key].get(extra_key) and extra_value not in (None, ""): rows[key][extra_key] = extra_value rows[key]["qty"] += qty rows[key]["amount"] += amount rows[key]["amount_original"] += amount if amount_original is None else amount_original rows[key]["count"] += 1 def add_amount_row(rows: dict[str, dict[str, Any]], key: str, code: str, name: str, amount: float, **extra): if key not in rows: rows[key] = {"code": code, "name": name or code or "-", "amount": 0.0, "count": 0, **extra} rows[key]["amount"] += amount rows[key]["count"] += 1 def add_payment_report_row( rows: dict[str, dict[str, Any]], key: str, code: str, name: str, amount: float, amount_original: float, tip: float = 0.0, **extra, ): if key not in rows: rows[key] = { "code": code, "name": name or code or "-", "amount": 0.0, "amount_original": 0.0, "tip": 0.0, "count": 0, **extra, } rows[key]["amount"] += amount rows[key]["amount_original"] += amount_original rows[key]["tip"] += tip rows[key]["count"] += 1 for receipt in receipts: if not receipt.get("ucislo"): continue operation = _cash_operation(receipt) total = _receipt_total(receipt) autor = _strip(receipt.get("autor")) or "UNKNOWN" prn_no = _receipt_printer(receipt) payments = [p for p in (receipt.get("platby") or []) if isinstance(p, dict)] if operation: payment = payments[0] if payments else {} code = _payment_code(payment) name = _payment_name(payment, payment_meta) amount = abs(_payment_amount(payment)) if payment else abs(total) op_label = _operation_label(operation) cash_operations.append({ "ucislo": _strip(receipt.get("ucislo")), "closed_at": _strip(receipt.get("closed_at")), "autor": autor, "operation": operation, "operation_label": op_label, "payment_code": code, "payment_name": name, "prn_no": prn_no, "amount": _money(amount), }) key = f"{op_label}|{prn_no}|{code}" add_amount_row( cash_operations_summary, key, code, name, amount, operation=operation, operation_label=op_label, prn_no=prn_no, ) continue payment_text = ", ".join( f"{_payment_name(payment, payment_meta)} {_payment_amount(payment):.2f}" for payment in payments ) original_total = sum( _item_original_amount(item) for item in (receipt.get("poloz") or []) if isinstance(item, dict) ) if abs(original_total) < 0.0001: original_total = total receipt_row = { "ucislo": _strip(receipt.get("ucislo")), "closed_at": _strip(receipt.get("closed_at")), "open_at": _strip(receipt.get("open_at")), "autor": autor, "stul": _strip(receipt.get("stul")), "table_name": _strip(receipt.get("table_name")) or _strip(receipt.get("stul")), "room_name": _strip(receipt.get("room_name")) or "-", "origin": _strip(receipt.get("origin")) or "Ucet", "is_storno": bool(_is_storno_receipt(receipt)), "pohladavka": int(receipt.get("pohladavka") or 0), "total_base_currency": total, "total_base_currency_original": _money(original_total), "payment_text": payment_text, } receipt_list.append(receipt_row) if receipt_row["is_storno"]: storno_journal.append(receipt_row) if autor not in receipt_counts_by_user: receipt_counts_by_user[autor] = { "code": autor, "name": autor, "count": 0, "amount": 0.0, "amount_original": 0.0, } receipt_counts_by_user[autor]["count"] += 1 receipt_counts_by_user[autor]["amount"] += total receipt_counts_by_user[autor]["amount_original"] += original_total room_key = receipt_row["room_name"] add_amount_row(rooms, room_key, room_key, room_key, total) table_key = f"{room_key}|{receipt_row['table_name']}" add_amount_row( tables, table_key, receipt_row["table_name"], receipt_row["table_name"], total, room_name=room_key, ) for item in receipt.get("poloz") or []: if not isinstance(item, dict): continue qty = _item_qty(item) amount = _item_amount(item) amount_original = _item_original_amount(item) kind_code = _strip(item.get("c_druh")) or "0" kind_name = _strip(item.get("druh")) or kind_code add_item_row(items_by_kind, kind_code, kind_code, kind_name, qty, amount, amount_original) spart = _strip(item.get("spart")) if spart: add_item_row(sparts, spart, spart, spart, qty, amount, amount_original) item_code = _strip(item.get("id_card")) or "0" item_name = _strip(item.get("nazev")) or item_code level = _strip(item.get("cenhlad")) or "0" storage = _strip(item.get("sklad")) or "00" manager_name = fooddat_names.get(storage) or storage vat_rate = _strip(item.get("dph")) unit_price = amount / qty if abs(qty) >= 0.0001 else _float(item.get("cena"), 0.0) sold_key = f"{item_code}|{storage}|{level}|{vat_rate}|{spart}" add_item_row( items_sold, sold_key, item_code, item_name, qty, amount, amount_original, id_zkratka=manager_name, druh=kind_name, spart=spart, sklad=storage, cen_hlad=level, dph=vat_rate, jc=_money(unit_price), ) add_item_row(price_levels, level, level, level, qty, amount, amount_original) add_item_row(storages, storage, storage, storage, qty, amount, amount_original) add_item_row(managers, storage, storage, manager_name, qty, amount, amount_original) vat_key = f"{storage}|{vat_rate}" if vat_key not in managers_by_vat: managers_by_vat[vat_key] = { "code": storage, "name": manager_name, "rate": vat_rate, "zaklad": 0.0, "dan": 0.0, "celkem": 0.0, } zaklad = _float(item.get("cena"), 0.0) * qty dan = _vat_tax_amount(vat_rate, zaklad) managers_by_vat[vat_key]["zaklad"] += zaklad managers_by_vat[vat_key]["dan"] += dan managers_by_vat[vat_key]["celkem"] += zaklad + dan payment_total = sum(_payment_amount(payment) for payment in payments) for payment in payments: code = _payment_code(payment) name = _payment_name(payment, payment_meta) amount = _payment_amount(payment) tip = _payment_tip(payment) share = amount / payment_total if abs(payment_total) >= 0.005 else 0.0 payment_original = _money(original_total * share) fiscal = _payment_is_fiscal(payment, payment_meta) bankterm = _payment_is_bankterm(payment, payment_meta) add_payment_report_row( payments_by_code, code, code, name, amount, payment_original, tip, ) add_payment_report_row( payments_by_user, f"{autor}|{code}", code, name, amount, payment_original, tip, autor=autor, username=autor, ) if fiscal: add_payment_report_row(fiscal_payments, code, code, name, amount, payment_original, tip) else: add_payment_report_row(nonfiscal_payments, code, code, name, amount, payment_original, tip) if bankterm or payment.get("terminal_result"): add_payment_report_row(terminal_payments, code, code, name, amount, payment_original, tip) if int(receipt.get("pohladavka") or 0) == 1: add_payment_report_row(receivables_by_payment, code, code, name, amount, payment_original, tip) meta = payment_meta.get(code) or {} odovzdat = _strip(meta.get("odovzdat")) if odovzdat: add_payment_report_row( cashiers_by_payment, f"{odovzdat}|{code}", code, name, amount, payment_original, tip, odovzdat=odovzdat, ) if int(meta.get("is_cash", 0) or 0) or code.upper() in {"CASH", "HOTOVOST", "HOTOVE"}: add_payment_report_row( cashiers_cash, autor, autor, autor, amount, payment_original, tip, username=autor, ) unit = _strip(payment.get("unit")) or "" if unit: key = unit if key not in currency_payments: currency_payments[key] = { "code": unit, "name": unit, "amount": 0.0, "base_amount": 0.0, "count": 0, } currency_payments[key]["amount"] += _float(payment.get("suma"), 0.0) currency_payments[key]["base_amount"] += amount currency_payments[key]["count"] += 1 for vat_row in receipt.get("dane") or []: if not isinstance(vat_row, dict): continue rate = _strip(vat_row.get("rate")) zaklad = _float(vat_row.get("zaklad"), 0.0) * share dan = _vat_tax_amount(rate, zaklad) celkem = zaklad + dan key = f"{code}|{rate}" target = payments_by_vat if key not in target: target[key] = { "payment_code": code, "payment_name": name, "rate": rate, "zaklad": 0.0, "dan": 0.0, "celkem": 0.0, } target[key]["zaklad"] += zaklad target[key]["dan"] += dan target[key]["celkem"] += celkem if fiscal: if key not in fiscal_payments_by_vat: fiscal_payments_by_vat[key] = { "payment_code": code, "payment_name": name, "rate": rate, "zaklad": 0.0, "dan": 0.0, "celkem": 0.0, } fiscal_payments_by_vat[key]["zaklad"] += zaklad fiscal_payments_by_vat[key]["dan"] += dan fiscal_payments_by_vat[key]["celkem"] += celkem for item in receipt.get("poloz") or []: if not isinstance(item, dict): continue storage = _strip(item.get("sklad")) or "00" manager_name = fooddat_names.get(storage) or storage item_amount = _item_amount(item) * share item_original = _item_original_amount(item) * share vat_rate = _strip(item.get("dph")) zaklad = _float(item.get("cena"), 0.0) * _item_qty(item) * share dan = _vat_tax_amount(vat_rate, zaklad) mp_key = f"{code}|{storage}|{vat_rate}" if mp_key not in managers_payments_by_vat: managers_payments_by_vat[mp_key] = { "payment_code": code, "payment_name": name, "code": storage, "name": manager_name, "rate": vat_rate, "zaklad": 0.0, "dan": 0.0, "celkem": 0.0, "amount_original": 0.0, } managers_payments_by_vat[mp_key]["zaklad"] += zaklad managers_payments_by_vat[mp_key]["dan"] += dan managers_payments_by_vat[mp_key]["celkem"] += item_amount managers_payments_by_vat[mp_key]["amount_original"] += item_original def sorted_tax_rows(rows: dict[str, dict[str, Any]]) -> list[dict[str, Any]]: result = [] for row in rows.values(): item = dict(row) item["zaklad"] = round(_float(item.get("zaklad"), 0.0), 4) item["dan"] = _money(item.get("dan")) item["celkem"] = _money(item.get("celkem")) result.append(item) return sorted(result, key=lambda item: (_strip(item.get("payment_name")), _strip(item.get("rate")))) currency_rows = [] for row in currency_payments.values(): item = dict(row) item["amount"] = _money(item.get("amount")) item["base_amount"] = _money(item.get("base_amount")) currency_rows.append(item) return { "receipt_list": sorted(receipt_list, key=lambda row: _strip(row.get("ucislo"))), "receipt_counts_by_user": _sort_rows(receipt_counts_by_user, "amount", "amount_original"), "items_by_kind": _sort_rows(items_by_kind, "amount", "amount_original"), "items_sold": _sort_rows(items_sold, "amount", "amount_original"), "price_levels": _sort_rows(price_levels, "amount", "amount_original"), "storages": _sort_rows(storages, "amount", "amount_original"), "sparts": _sort_rows(sparts, "amount", "amount_original"), "rooms": _sort_rows(rooms, "amount"), "tables": _sort_rows(tables, "amount"), "payments_by_code": _sort_rows(payments_by_code, "amount", "amount_original", "tip"), "payments_by_user": _sort_rows(payments_by_user, "amount", "amount_original", "tip"), "payments_by_vat": sorted_tax_rows(payments_by_vat), "fiscal_payments_by_vat": sorted_tax_rows(fiscal_payments_by_vat), "fiscal_payments": _sort_rows(fiscal_payments, "amount", "amount_original", "tip"), "nonfiscal_payments": _sort_rows(nonfiscal_payments, "amount", "amount_original", "tip"), "terminal_payments": _sort_rows(terminal_payments, "amount", "amount_original", "tip"), "receivables_by_payment": _sort_rows(receivables_by_payment, "amount", "amount_original", "tip"), "managers": _sort_rows(managers, "amount", "amount_original"), "managers_by_vat": sorted_tax_rows(managers_by_vat), "managers_payments_by_vat": sorted_tax_rows(managers_payments_by_vat), "cashiers_by_payment": _sort_rows(cashiers_by_payment, "amount", "amount_original", "tip"), "cashiers_cash": _sort_rows(cashiers_cash, "amount", "amount_original", "tip"), "cash_operations": cash_operations, "cash_operations_summary": _sort_rows(cash_operations_summary, "amount"), "currency_payments": sorted(currency_rows, key=lambda row: _strip(row.get("code"))), "storno_journal": sorted(storno_journal, key=lambda row: _strip(row.get("ucislo"))), } def _build_cash_state( cur, table_ucty: str, id_kas: str, receipts: list[dict[str, Any]], ) -> list[dict[str, Any]]: payment_meta = _load_payment_meta(cur, table_ucty, id_kas) previous_carry = _load_previous_cash_carry(cur, table_ucty, id_kas) state: dict[tuple[str, str], dict[str, Any]] = {} def row_for(prn_no: str, code: str, name: str = "") -> dict[str, Any]: prn_no = _strip(prn_no) code = _strip(code) or "UNKNOWN" key = (prn_no, code) if key not in state: meta = payment_meta.get(code) or {} state[key] = { "prn_no": prn_no, "payment_code": code, "payment_name": _strip(name or meta.get("name") or code), "payment_unit": _strip(meta.get("unit") or ""), "payment_odvod": int(meta.get("odvod", 0) or 0), "payment_odovzdat": _strip(meta.get("odovzdat") or ""), "payment_fiscal": int(meta.get("fiscal", 0) or 0), "payment_is_cash": int(meta.get("is_cash", 0) or 0), "opening_amount": _money(previous_carry.get(key, 0.0)), "sales_amount": 0.0, "receivable_amount": 0.0, "manual_deposit_amount": 0.0, "manual_withdrawal_amount": 0.0, "auto_deposit_amount": 0.0, "auto_withdrawal_amount": 0.0, "balance_amount": 0.0, "carry_amount": 0.0, "generated_ucislo": "", "fiscal_result": {}, "status": "preview", "error": "", } return state[key] for (prn_no, code), amount in previous_carry.items(): if abs(amount) >= 0.005: row_for(prn_no, code) for receipt in receipts: prn_no = _receipt_printer(receipt) op = _cash_operation(receipt) payments = receipt.get("platby") or [] if not isinstance(payments, list): payments = [] if op: if payments: payment = payments[0] if isinstance(payments[0], dict) else {} code = _payment_code(payment) name = _payment_name(payment, payment_meta) amount = abs(_payment_amount(payment)) else: code = "UNKNOWN" name = "UNKNOWN" amount = abs(_float(receipt.get("total_base_currency"), 0.0)) target = row_for(prn_no, code, name) if _is_deposit(op): target["manual_deposit_amount"] = _money(target["manual_deposit_amount"] + amount) elif _is_withdrawal(op): target["manual_withdrawal_amount"] = _money(target["manual_withdrawal_amount"] + amount) continue is_receivable = int(receipt.get("pohladavka") or 0) == 1 for payment in payments: if not isinstance(payment, dict): continue code = _payment_code(payment) name = _payment_name(payment, payment_meta) amount = _payment_amount(payment) target = row_for(prn_no, code, name) if is_receivable: target["receivable_amount"] = _money(target["receivable_amount"] + amount) else: target["sales_amount"] = _money(target["sales_amount"] + amount) for row in state.values(): balance = ( row["opening_amount"] + row["sales_amount"] + row["receivable_amount"] + row["manual_deposit_amount"] + row["auto_deposit_amount"] - row["manual_withdrawal_amount"] - row["auto_withdrawal_amount"] ) row["balance_amount"] = _money(balance) row["carry_amount"] = row["balance_amount"] if abs(row["balance_amount"]) < 0.005: row["status"] = "settled" return sorted( state.values(), key=lambda item: ( _strip(item.get("prn_no")), _strip(item.get("payment_name")), _strip(item.get("payment_code")), ), ) def compute_closure_report( cur, table_ucty: str, table_clsrep: str | None = None, ucislo_st: str | None = None, ucislo_end: str | None = None, id_kas: str = "", ) -> ClosureReportOut | None: if not table_clsrep: table_clsrep = table_ucty.replace("_ucty", "_clsrep") cur.execute( f""" SELECT ucislo_end, clsrep_no FROM "{table_clsrep}" WHERE id_kas=? ORDER BY clsrep_id DESC LIMIT 1 """, (id_kas,), ) row = cur.fetchone() last_clsrep_no = row[1] if row else None unassigned_where = """ id_kas=? AND closed_at IS NOT NULL AND TRIM(COALESCE(ucislo, '')) != '' AND (c_uzaverka IS NULL OR c_uzaverka = 0) """ if not ucislo_st: params: list[Any] = [id_kas] extra = "" if ucislo_end: extra = " AND CAST(ucislo AS INTEGER)<=CAST(? AS INTEGER)" params.append(ucislo_end) cur.execute( f""" SELECT ucislo FROM "{table_ucty}" WHERE {unassigned_where} {extra} ORDER BY CAST(ucislo AS INTEGER) LIMIT 1 """, params, ) row = cur.fetchone() ucislo_st = row[0] if row else None if not ucislo_end: params = [id_kas] extra = "" if ucislo_st: extra = " AND CAST(ucislo AS INTEGER)>=CAST(? AS INTEGER)" params.append(ucislo_st) cur.execute( f""" SELECT ucislo FROM "{table_ucty}" WHERE {unassigned_where} {extra} ORDER BY CAST(ucislo AS INTEGER) DESC LIMIT 1 """, params, ) row = cur.fetchone() ucislo_end = row[0] if row else None if not ucislo_st or not ucislo_end: return None if last_clsrep_no: try: next_number = int(str(last_clsrep_no).split("-")[-1]) + 1 except Exception: next_number = 1 else: next_number = 1 clsrep_no = f"{id_kas}-{next_number:05d}" cur.execute( f""" SELECT ucislo, closed_at, data FROM "{table_ucty}" WHERE {unassigned_where} AND CAST(ucislo AS INTEGER)>=CAST(? AS INTEGER) AND CAST(ucislo AS INTEGER)<=CAST(? AS INTEGER) ORDER BY CAST(ucislo AS INTEGER) """, (id_kas, ucislo_st, ucislo_end), ) rows = cur.fetchall() if not rows: return None ucislo_first, closed_first, _ = rows[0] ucislo_last, closed_last, _ = rows[-1] receipts = [] for ucislo, closed_at, data_json in rows: receipt = _json_loads(data_json) if not receipt: continue receipt.setdefault("ucislo", ucislo) receipt.setdefault("closed_at", closed_at) receipts.append(receipt) by_payment = defaultdict(float) by_user_total = defaultdict(float) by_user_cash = defaultdict(float) by_vat = defaultdict(lambda: {"zaklad": 0.0, "dan": 0.0}) payment_meta = _load_payment_meta(cur, table_ucty, id_kas) total_base = 0.0 total_payments = 0.0 count = 0 for receipt in receipts: if not receipt.get("ucislo"): continue if _cash_operation(receipt): continue count += 1 autor = receipt.get("autor") or "UNKNOWN" base_val = _money(receipt.get("total_base_currency")) total_base += base_val by_user_total[autor] += base_val for payment in receipt.get("platby") or []: if not isinstance(payment, dict): continue code = _payment_code(payment) amount = _payment_amount(payment) by_payment[code] += amount total_payments += amount meta = payment_meta.get(code) or {} if int(meta.get("is_cash", 0) or 0) or code.upper() in {"CASH", "HOTOVOST", "HOTOVE"}: by_user_cash[autor] += amount for vat_row in receipt.get("dane") or []: if not isinstance(vat_row, dict): continue rate = _strip(vat_row.get("rate")) zaklad = _float(vat_row.get("zaklad"), 0.0) by_vat[rate]["zaklad"] += zaklad by_vat[rate]["dan"] += _vat_tax_amount(rate, zaklad) interval = ClosureInterval( ucislo_od=ucislo_first, ucislo_do=ucislo_last, closed_at_od=closed_first, closed_at_do=closed_last, ) summary = ClosureSummary( pocet_uctu=count, total_base_currency=_money(total_base), total_payments=_money(total_payments), difference=_money(total_base - total_payments), ) users = { user: ClosureUser( total_base_currency=_money(by_user_total[user]), hotovost=_money(by_user_cash[user]), ) for user in sorted(by_user_total) } vat = { rate: ClosureVAT( zaklad=round(values["zaklad"], 4), dan=_money(values["dan"]), celkem=_money(values["zaklad"] + values["dan"]), ) for rate, values in sorted(by_vat.items()) } cur.execute( f""" SELECT ucislo, stul, data FROM "{table_ucty}" WHERE id_kas=? AND closed_at IS NULL ORDER BY rowid """, (id_kas,), ) open_ucty = [] for ucislo_o, stul_db, data_json_o in cur.fetchall(): opened = _json_loads(data_json_o) total = _float(opened.get("total_base_currency"), 0.0) if abs(total) < 0.0001: total = _open_total_from_poloz(opened) open_ucty.append({ "ucislo": _strip(ucislo_o) or "-", "stul": _strip(stul_db) or _strip(opened.get("stul")), "autor": _strip(opened.get("autor")), "open_at": _strip(opened.get("open_at") or opened.get("datetime")), "blocked_by": _strip(opened.get("blocked_by")), "total_base_currency": _money(total), "pocet_polozek": len(opened.get("poloz") or []), }) return ClosureReportOut( blocked_by="", clsrep_no=clsrep_no, created_at=datetime.now().strftime("%y%m%d %H:%M:%S"), interval=interval, summary=summary, platby={key: _money(by_payment[key]) for key in sorted(by_payment)}, uzivatele=users, dph=vat, open_ucty=open_ucty, cash_state=_build_cash_state(cur, table_ucty, id_kas, receipts), sections=_build_report_sections(cur, table_ucty, id_kas, receipts), )