5630 lines
226 KiB
Python
5630 lines
226 KiB
Python
FrontEndVersion = "072_8_Kivy.MB"
|
||
from kivy.config import Config
|
||
Config.set('input', 'mouse', 'mouse,disable_multitouch')
|
||
from kivy.app import App
|
||
from kivy.uix.screenmanager import ScreenManager, Screen
|
||
from kivy.uix.boxlayout import BoxLayout
|
||
from kivy.uix.label import Label
|
||
from kivy.uix.button import Button
|
||
from kivy.metrics import dp
|
||
from kivy.logger import Logger
|
||
from kivy.uix.popup import Popup
|
||
from kivy.uix.gridlayout import GridLayout
|
||
from kivy.uix.textinput import TextInput
|
||
from kivy.core.window import Window
|
||
from kivy.uix.scrollview import ScrollView
|
||
from kivy.graphics import Color, Rectangle
|
||
from kivy.clock import Clock
|
||
from kivy.uix.modalview import ModalView
|
||
from kivy.uix.anchorlayout import AnchorLayout
|
||
from pydantic import TypeAdapter
|
||
from quest_ipadress import IpPortInput
|
||
|
||
import threading
|
||
import calendar
|
||
from pydantic import SecretStr
|
||
import json
|
||
import os
|
||
import socket
|
||
import traceback
|
||
import unicodedata
|
||
import logging
|
||
from pathlib import Path
|
||
from ui_utils import _popup_info
|
||
import payment
|
||
import api_call
|
||
from api_call import ApiContext
|
||
import data
|
||
import bankterm_service
|
||
import numberpad
|
||
import accountselect
|
||
from loginscreen import LoginUserScreen
|
||
import posdialog
|
||
import kivy_printer
|
||
import clsrep_select
|
||
from i18n import Translator, normalize_lang
|
||
from ui_utils import _popup_info
|
||
from datetime import datetime, timedelta
|
||
from konstanty import *
|
||
from posdialog import BaseModal, TextMessageDialog, PosKeyboard, ModalManager
|
||
|
||
|
||
from kivy.uix.boxlayout import BoxLayout
|
||
from kivy.uix.spinner import Spinner
|
||
from kivy.uix.button import Button
|
||
from kivy.uix.label import Label
|
||
from kivy.metrics import dp
|
||
|
||
|
||
class ClosureCarryDialog(BaseModal):
|
||
def __init__(
|
||
self,
|
||
modal_manager,
|
||
report: dict,
|
||
printer_names: dict[str, str],
|
||
on_confirm,
|
||
on_cancel=None,
|
||
**kwargs,
|
||
):
|
||
super().__init__(
|
||
modal_manager=modal_manager,
|
||
size_hint=(0.96, 0.84),
|
||
**kwargs,
|
||
)
|
||
self.report = report or {}
|
||
self.printer_names = printer_names or {}
|
||
self.on_confirm = on_confirm
|
||
self.on_cancel = on_cancel
|
||
self.entries: list[dict] = []
|
||
self.active_entry: dict | None = None
|
||
self._keyboard_bound = False
|
||
|
||
root = BoxLayout(
|
||
orientation="horizontal",
|
||
spacing=dp(12),
|
||
padding=dp(14),
|
||
)
|
||
self.add_widget(root)
|
||
|
||
main = BoxLayout(orientation="vertical", spacing=dp(10), size_hint=(1, 1))
|
||
root.add_widget(main)
|
||
|
||
title = Label(
|
||
text="Naozaj chcete spravit novu uzavierku?",
|
||
font_size=dp(22),
|
||
bold=True,
|
||
color=(1, 1, 1, 1),
|
||
size_hint=(1, None),
|
||
height=dp(42),
|
||
)
|
||
main.add_widget(title)
|
||
|
||
header = self._row_container(height=dp(34), background=(0.18, 0.19, 0.21, 1))
|
||
for label, width, align in [
|
||
("METODA", 0.15, "left"),
|
||
("TLACIAREN", 0.16, "left"),
|
||
("TRZBA", 0.12, "right"),
|
||
("UHR. POHL.", 0.13, "right"),
|
||
("VKL./VYB.", 0.14, "right"),
|
||
("SPOLU", 0.11, "right"),
|
||
("PONECHAT", 0.10, "right"),
|
||
("OPERACIA", 0.09, "right"),
|
||
]:
|
||
header.add_widget(self._cell(label, width, bold=True, halign=align))
|
||
main.add_widget(header)
|
||
|
||
self.rows_box = GridLayout(cols=1, spacing=dp(2), size_hint_y=None)
|
||
self.rows_box.bind(minimum_height=self.rows_box.setter("height"))
|
||
scroll = ScrollView(size_hint=(1, 1), do_scroll_x=False)
|
||
scroll.add_widget(self.rows_box)
|
||
main.add_widget(scroll)
|
||
|
||
for row in self._cash_rows():
|
||
self._add_cash_row(row)
|
||
|
||
self.error_label = Label(
|
||
text="",
|
||
color=(1, 0.25, 0.25, 1),
|
||
size_hint=(1, None),
|
||
height=dp(26),
|
||
halign="left",
|
||
)
|
||
self.error_label.bind(size=lambda inst, *_: setattr(inst, "text_size", inst.size))
|
||
main.add_widget(self.error_label)
|
||
|
||
actions = BoxLayout(size_hint=(1, None), height=dp(52), spacing=dp(10))
|
||
btn_no = Button(text="Nie", background_color=(0.45, 0.45, 0.45, 1))
|
||
btn_yes = Button(text="Ano", background_color=(0.20, 0.65, 0.55, 1))
|
||
btn_no.bind(on_release=lambda *_: self._cancel())
|
||
btn_yes.bind(on_release=lambda *_: self._confirm())
|
||
actions.add_widget(Label(size_hint=(1, 1)))
|
||
actions.add_widget(btn_no)
|
||
actions.add_widget(btn_yes)
|
||
actions.add_widget(Label(size_hint=(1, 1)))
|
||
main.add_widget(actions)
|
||
|
||
if self.entries:
|
||
self._select_entry(self.entries[0])
|
||
|
||
def on_open(self):
|
||
if self._keyboard_bound:
|
||
return
|
||
Window.unbind(on_key_down=self._on_key_down)
|
||
Window.bind(on_key_down=self._on_key_down)
|
||
self._keyboard_bound = True
|
||
|
||
def on_dismiss(self):
|
||
if self._keyboard_bound:
|
||
Window.unbind(on_key_down=self._on_key_down)
|
||
self._keyboard_bound = False
|
||
|
||
def _on_key_down(self, window, keycode, scancode, codepoint, modifiers):
|
||
if codepoint:
|
||
return self.handle_key(codepoint)
|
||
mapped = {
|
||
8: "BACKSPACE",
|
||
9: "TAB",
|
||
13: "ENTER",
|
||
27: "ESC",
|
||
127: "DELETE",
|
||
273: "UP",
|
||
274: "DOWN",
|
||
271: "ENTER",
|
||
}.get(keycode, keycode)
|
||
return self.handle_key(mapped)
|
||
|
||
def _cash_rows(self) -> list[dict]:
|
||
rows = []
|
||
for row in self.report.get("cash_state") or []:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
try:
|
||
odvod = int(row.get("payment_odvod") or 0)
|
||
except Exception:
|
||
odvod = 0
|
||
if odvod == 1:
|
||
rows.append(row)
|
||
return rows
|
||
|
||
def _row_container(self, height=dp(42), background=(0.24, 0.25, 0.27, 1)):
|
||
box = BoxLayout(size_hint=(1, None), height=height)
|
||
with box.canvas.before:
|
||
Color(*background)
|
||
rect = Rectangle(pos=box.pos, size=box.size)
|
||
box.bind(pos=lambda inst, *_: setattr(rect, "pos", inst.pos))
|
||
box.bind(size=lambda inst, *_: setattr(rect, "size", inst.size))
|
||
return box
|
||
|
||
def _cell(self, text, width, bold=False, halign="left"):
|
||
label = Label(
|
||
text=str(text or ""),
|
||
size_hint=(width, 1),
|
||
color=(1, 1, 1, 1),
|
||
bold=bold,
|
||
halign=halign,
|
||
valign="middle",
|
||
font_size=dp(14),
|
||
padding=(dp(6), 0),
|
||
)
|
||
label.bind(size=lambda inst, *_: setattr(inst, "text_size", (inst.width - dp(8), inst.height)))
|
||
return label
|
||
|
||
def _money(self, value) -> str:
|
||
amount = self._amount(value)
|
||
return f"{amount:.2f}"
|
||
|
||
def _amount(self, value) -> float:
|
||
try:
|
||
return float(value or 0)
|
||
except Exception:
|
||
return 0.0
|
||
|
||
def _parse_amount(self, value) -> float:
|
||
try:
|
||
return round(max(float(str(value or "0").replace(",", ".")), 0.0), 2)
|
||
except Exception:
|
||
return 0.0
|
||
|
||
def _movement_amount(self, row: dict) -> float:
|
||
return round(
|
||
self._amount(row.get("manual_deposit_amount"))
|
||
+ self._amount(row.get("auto_deposit_amount"))
|
||
- self._amount(row.get("manual_withdrawal_amount"))
|
||
- self._amount(row.get("auto_withdrawal_amount")),
|
||
2,
|
||
)
|
||
|
||
def _add_cash_row(self, row: dict):
|
||
balance = round(self._amount(row.get("balance_amount")), 2)
|
||
raw = "0.00"
|
||
entry = {
|
||
"row": row,
|
||
"carry": 0.0,
|
||
"raw": raw,
|
||
"balance": balance,
|
||
"button": None,
|
||
"operation": None,
|
||
"container": None,
|
||
}
|
||
box = self._row_container(height=dp(44), background=(0.12, 0.13, 0.14, 1))
|
||
entry["container"] = box
|
||
payment_name = row.get("payment_name") or row.get("payment_code") or ""
|
||
prn_no = str(row.get("prn_no") or "").strip()
|
||
printer_name = self.printer_names.get(prn_no, "")
|
||
printer_text = f"{prn_no} - {printer_name}" if printer_name else prn_no
|
||
for text, width, align in [
|
||
(payment_name, 0.15, "left"),
|
||
(printer_text, 0.16, "left"),
|
||
(self._money(row.get("sales_amount")), 0.12, "right"),
|
||
(self._money(row.get("receivable_amount")), 0.13, "right"),
|
||
(self._money(self._movement_amount(row)), 0.14, "right"),
|
||
(self._money(balance), 0.11, "right"),
|
||
]:
|
||
box.add_widget(self._cell(text, width, halign=align))
|
||
btn = Button(
|
||
text=raw.replace(".", ","),
|
||
size_hint=(0.10, 1),
|
||
background_color=(0.30, 0.31, 0.33, 1),
|
||
font_size=dp(16),
|
||
)
|
||
btn.bind(on_release=lambda *_, entry=entry: self._open_amount_pad(entry))
|
||
entry["button"] = btn
|
||
box.add_widget(btn)
|
||
op_label = self._cell("", 0.09, halign="right")
|
||
entry["operation"] = op_label
|
||
box.add_widget(op_label)
|
||
self.rows_box.add_widget(box)
|
||
self.entries.append(entry)
|
||
self._refresh_entry(entry)
|
||
|
||
def _build_keypad(self):
|
||
side = BoxLayout(
|
||
orientation="vertical",
|
||
spacing=dp(8),
|
||
padding=dp(8),
|
||
size_hint=(None, 1),
|
||
width=dp(230),
|
||
)
|
||
side.add_widget(Label(text="EUR", bold=True, size_hint=(1, None), height=dp(34), color=(1, 1, 1, 1)))
|
||
grid = GridLayout(cols=3, spacing=dp(2), size_hint=(1, None), height=dp(300))
|
||
for key in ["7", "8", "9", "4", "5", "6", "1", "2", "3", "0", ",", "DEL"]:
|
||
btn = Button(
|
||
text=key,
|
||
font_size=dp(24) if key != "DEL" else dp(16),
|
||
background_color=(0.95, 0.95, 0.95, 1) if key != "DEL" else (0.20, 0.65, 0.62, 1),
|
||
color=(0.05, 0.05, 0.05, 1) if key != "DEL" else (1, 1, 1, 1),
|
||
)
|
||
btn.bind(on_release=lambda _btn, k=key: self._pad_press(k))
|
||
grid.add_widget(btn)
|
||
side.add_widget(grid)
|
||
side.add_widget(Label(size_hint=(1, 1)))
|
||
return side
|
||
|
||
def _select_entry(self, entry: dict):
|
||
self.active_entry = entry
|
||
for item in self.entries:
|
||
button = item.get("button")
|
||
if button:
|
||
button.background_color = (0.16, 0.55, 0.54, 1) if item is entry else (0.30, 0.31, 0.33, 1)
|
||
|
||
def _open_amount_pad(self, entry: dict):
|
||
self._select_entry(entry)
|
||
|
||
def accept(value: str):
|
||
amount = self._parse_amount(value)
|
||
entry["raw"] = f"{amount:.2f}"
|
||
entry["carry"] = amount
|
||
self._refresh_entry(entry)
|
||
|
||
numberpad.NumberPad(
|
||
mode="number",
|
||
allow_fraction=False,
|
||
decimal_places=2,
|
||
max_len=8,
|
||
initial_value=f"{self._parse_amount(entry.get('raw')):.2f}",
|
||
on_accept=accept,
|
||
).open()
|
||
|
||
def _pad_press(self, key: str):
|
||
entry = self.active_entry
|
||
if not entry:
|
||
return
|
||
raw = str(entry.get("raw") or "")
|
||
if key == "DEL":
|
||
raw = raw[:-1]
|
||
elif key == ",":
|
||
if "." not in raw:
|
||
raw = (raw or "0") + "."
|
||
else:
|
||
if raw in {"0", "0.00", "0,00"}:
|
||
raw = ""
|
||
before_decimal = raw.split(".", 1)[0]
|
||
if "." not in raw and len(before_decimal) >= 8:
|
||
return
|
||
if "." in raw and len(raw.split(".", 1)[1]) >= 2:
|
||
return
|
||
raw += key
|
||
entry["raw"] = raw
|
||
entry["carry"] = self._parse_amount(raw)
|
||
self._refresh_entry(entry)
|
||
|
||
def _refresh_entry(self, entry: dict):
|
||
raw = str(entry.get("raw") or "")
|
||
carry = self._parse_amount(raw)
|
||
entry["carry"] = carry
|
||
button = entry.get("button")
|
||
if button:
|
||
button.text = (raw or "0").replace(".", ",")
|
||
balance = float(entry.get("balance") or 0.0)
|
||
delta = round(balance - carry, 2)
|
||
if delta > 0.004:
|
||
text = f"Vyber {delta:.2f}"
|
||
elif delta < -0.004:
|
||
text = f"Vklad {abs(delta):.2f}"
|
||
else:
|
||
text = "-"
|
||
label = entry.get("operation")
|
||
if label:
|
||
label.text = text
|
||
|
||
def _confirm(self):
|
||
if not self.entries:
|
||
self.error_label.text = "Nie je co odviest."
|
||
return
|
||
payload = []
|
||
for entry in self.entries:
|
||
row = entry.get("row") or {}
|
||
payload.append({
|
||
"prn_no": str(row.get("prn_no") or "").strip(),
|
||
"payment_code": str(row.get("payment_code") or "").strip(),
|
||
"carry_amount": self._parse_amount(entry.get("raw")),
|
||
})
|
||
self.close()
|
||
if self.on_confirm:
|
||
self.on_confirm(payload)
|
||
|
||
def _cancel(self):
|
||
self.close()
|
||
if self.on_cancel:
|
||
self.on_cancel()
|
||
|
||
def handle_key(self, key):
|
||
raw_key = key
|
||
key = str(key or "").upper()
|
||
if key in {"ESC", "ESCAPE"}:
|
||
self._cancel()
|
||
return True
|
||
if key in {"ENTER", "NUMPADENTER"}:
|
||
self._confirm()
|
||
return True
|
||
if key in {"BACKSPACE", "DELETE"} or raw_key in (8, 127):
|
||
self._pad_press("DEL")
|
||
return True
|
||
if key in {",", ".", "DECIMAL"}:
|
||
self._pad_press(",")
|
||
return True
|
||
if key in {"UP", "ARROWUP"}:
|
||
self._select_relative_entry(-1)
|
||
return True
|
||
if key in {"DOWN", "ARROWDOWN", "TAB"}:
|
||
self._select_relative_entry(1)
|
||
return True
|
||
digit = None
|
||
if key in {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}:
|
||
digit = key
|
||
elif isinstance(raw_key, int) and 256 <= raw_key <= 265:
|
||
digit = str(raw_key - 256)
|
||
if digit is not None:
|
||
self._pad_press(digit)
|
||
return True
|
||
return True
|
||
|
||
def _select_relative_entry(self, delta: int):
|
||
if not self.entries:
|
||
return
|
||
try:
|
||
index = self.entries.index(self.active_entry)
|
||
except Exception:
|
||
index = 0
|
||
index = (index + delta) % len(self.entries)
|
||
self._select_entry(self.entries[index])
|
||
|
||
|
||
class SystemStatusDialog(Popup):
|
||
def __init__(self, controller, **kwargs):
|
||
super().__init__(
|
||
title="O systéme",
|
||
size_hint=(0.96, 0.92),
|
||
auto_dismiss=False,
|
||
**kwargs,
|
||
)
|
||
self.controller = controller
|
||
|
||
root = BoxLayout(orientation="vertical", spacing=dp(10), padding=dp(12))
|
||
self._paint(root, (0.26, 0.26, 0.29, 1))
|
||
self.body = GridLayout(cols=1, spacing=dp(10), size_hint_y=None)
|
||
self.body.bind(minimum_height=self.body.setter("height"))
|
||
scroll = ScrollView(size_hint=(1, 1), do_scroll_x=False, bar_width=dp(14))
|
||
scroll.add_widget(self.body)
|
||
root.add_widget(scroll)
|
||
|
||
actions = BoxLayout(size_hint=(1, None), height=dp(52), spacing=dp(10))
|
||
btn_refresh = Button(text="Obnoviť", background_color=(0.22, 0.55, 0.75, 1))
|
||
btn_close = Button(text="Zavrieť", background_color=(0.50, 0.50, 0.50, 1))
|
||
btn_refresh.bind(on_release=lambda *_: self.refresh())
|
||
btn_close.bind(on_release=lambda *_: self.dismiss())
|
||
actions.add_widget(Label(size_hint=(1, 1)))
|
||
actions.add_widget(btn_refresh)
|
||
actions.add_widget(btn_close)
|
||
root.add_widget(actions)
|
||
|
||
self.content = root
|
||
self.refresh()
|
||
|
||
def _paint(self, widget, color):
|
||
with widget.canvas.before:
|
||
Color(*color)
|
||
rect = Rectangle(pos=widget.pos, size=widget.size)
|
||
widget.bind(pos=lambda inst, *_: setattr(rect, "pos", inst.pos))
|
||
widget.bind(size=lambda inst, *_: setattr(rect, "size", inst.size))
|
||
return widget
|
||
|
||
def _label(
|
||
self,
|
||
text: str,
|
||
*,
|
||
size_hint_x=1,
|
||
height=dp(28),
|
||
bold=False,
|
||
color=(1, 1, 1, 1),
|
||
halign="left",
|
||
font_size=14,
|
||
shorten=True,
|
||
):
|
||
lbl = Label(
|
||
text=str(text or ""),
|
||
size_hint=(size_hint_x, None),
|
||
height=height,
|
||
bold=bold,
|
||
color=color,
|
||
halign=halign,
|
||
valign="middle",
|
||
font_size=dp(font_size),
|
||
shorten=shorten,
|
||
shorten_from="right",
|
||
)
|
||
lbl.bind(size=lambda inst, *_: setattr(inst, "text_size", (max(inst.width - dp(6), dp(10)), inst.height)))
|
||
return lbl
|
||
|
||
def _panel(self, title: str):
|
||
panel = GridLayout(cols=1, spacing=dp(4), padding=dp(10), size_hint_y=None)
|
||
panel.bind(minimum_height=panel.setter("height"))
|
||
self._paint(panel, (0.12, 0.13, 0.14, 1))
|
||
panel.add_widget(self._label(title, height=dp(32), bold=True, font_size=16, shorten=False))
|
||
self.body.add_widget(panel)
|
||
return panel
|
||
|
||
def _add_row(self, panel, cells, *, height=dp(34), background=None, bold=False):
|
||
row = BoxLayout(size_hint=(1, None), height=height, spacing=dp(6), padding=(dp(6), 0))
|
||
if background:
|
||
self._paint(row, background)
|
||
for cell in cells:
|
||
text = cell[0] if len(cell) > 0 else ""
|
||
width = cell[1] if len(cell) > 1 else 1
|
||
align = cell[2] if len(cell) > 2 else "left"
|
||
color = cell[3] if len(cell) > 3 else (1, 1, 1, 1)
|
||
row.add_widget(self._label(text, size_hint_x=width, height=height, bold=bold, color=color, halign=align))
|
||
panel.add_widget(row)
|
||
|
||
def _safe_call(self, label: str, callback, default):
|
||
try:
|
||
return callback(), ""
|
||
except Exception as exc:
|
||
Logger.exception("System status load failed: %s", label)
|
||
return default, str(exc)
|
||
|
||
def _as_dict(self, value):
|
||
if isinstance(value, dict):
|
||
return value
|
||
if hasattr(value, "model_dump"):
|
||
return value.model_dump()
|
||
return {}
|
||
|
||
def _short(self, value, length=90):
|
||
text = str(value or "").replace("\n", " ").strip()
|
||
return text if len(text) <= length else text[:max(0, length - 3)] + "..."
|
||
|
||
def _status_color(self, status_text: str, online=None, failed=0):
|
||
text = str(status_text or "").lower()
|
||
if failed:
|
||
return (1.0, 0.42, 0.35, 1)
|
||
if online is True or text in {"ok", "online", "ready"}:
|
||
return (0.35, 0.90, 0.45, 1)
|
||
if "fail" in text or "error" in text or "chy" in text:
|
||
return (1.0, 0.42, 0.35, 1)
|
||
if text in {"unknown", "", "offline"} or online is False:
|
||
return (1.0, 0.70, 0.25, 1)
|
||
return (1, 1, 1, 1)
|
||
|
||
def refresh(self):
|
||
self.body.clear_widgets()
|
||
ctrl = self.controller
|
||
printers, printers_error = self._safe_call(
|
||
"printers",
|
||
lambda: api_call.load_all_printers_API(ctrl.ctx),
|
||
[],
|
||
)
|
||
statuses, statuses_error = self._safe_call(
|
||
"printer_status",
|
||
lambda: api_call.load_printer_status_API(ctrl.ctx),
|
||
[],
|
||
)
|
||
diagnostics, diagnostics_error = self._safe_call(
|
||
"print_worker_diagnostics",
|
||
lambda: api_call.load_print_worker_diagnostics_API(ctrl.ctx, id_kas=None, limit=150),
|
||
{},
|
||
)
|
||
postgres_status, postgres_error = self._safe_call(
|
||
"postgres_status",
|
||
lambda: api_call.load_postgres_status_API(ctrl.ctx, test_connection=False),
|
||
None,
|
||
)
|
||
|
||
self._add_system_cards(postgres_status, postgres_error)
|
||
self._add_worker_panel(diagnostics, diagnostics_error)
|
||
self._add_printers_panel(printers, statuses, diagnostics, printers_error, statuses_error)
|
||
self._add_jobs_panel(diagnostics, diagnostics_error)
|
||
|
||
def _add_system_cards(self, postgres_status, postgres_error: str):
|
||
ctrl = self.controller
|
||
row = BoxLayout(size_hint=(1, None), height=dp(112), spacing=dp(10))
|
||
self.body.add_widget(row)
|
||
user_name = getattr(getattr(ctrl, "user_login", None), "name", "") or "-"
|
||
cards = [
|
||
("Frontend", [
|
||
f"Verzia: {ctrl.version_frontend or '-'}",
|
||
f"Používateľ: {user_name}",
|
||
f"Jazyk: {ctrl._current_language()}",
|
||
]),
|
||
("API", [
|
||
f"Verzia: {ctrl.version_API or '-'}",
|
||
f"ID kasy: {getattr(ctrl.ctx, 'id_kas', '') or '-'}",
|
||
f"URL: {self._short(getattr(ctrl.ctx, 'base_url', ''), 42)}",
|
||
]),
|
||
("Databáza", [
|
||
f"Názov: {ctrl.database_name or '-'}",
|
||
f"PostgreSQL: {self._postgres_status_text(postgres_status, postgres_error)}",
|
||
f"IP appky: {get_local_ip()}",
|
||
]),
|
||
]
|
||
for title, lines in cards:
|
||
card = GridLayout(cols=1, padding=dp(10), spacing=dp(2), size_hint=(1, 1))
|
||
self._paint(card, (0.94, 0.94, 0.94, 1))
|
||
card.add_widget(self._label(title, height=dp(24), bold=True, color=(0, 0, 0, 1), shorten=False))
|
||
for line in lines:
|
||
card.add_widget(self._label(line, height=dp(22), color=(0, 0, 0, 1), font_size=13))
|
||
row.add_widget(card)
|
||
|
||
def _postgres_status_text(self, postgres_status, error: str) -> str:
|
||
if error:
|
||
return self._short(error, 46)
|
||
if not postgres_status:
|
||
return "-"
|
||
status = self._as_dict(postgres_status)
|
||
if status.get("connection_ok"):
|
||
return "dostupný"
|
||
if not status.get("cashier_enabled"):
|
||
return "vypnutý pre kasu"
|
||
return self._short(status.get("message") or "nedostupný", 46)
|
||
|
||
def _add_worker_panel(self, diagnostics: dict, error: str):
|
||
panel = self._panel("Tlačový worker")
|
||
if error:
|
||
self._add_row(panel, [("Chyba diagnostiky", 0.25), (self._short(error, 140), 0.75, "left", (1, 0.45, 0.35, 1))])
|
||
return
|
||
if not diagnostics or diagnostics.get("ok") is False:
|
||
self._add_row(panel, [("Diagnostika", 0.25), (self._short(diagnostics.get("error") if isinstance(diagnostics, dict) else "Nie je dostupná", 140), 0.75)])
|
||
return
|
||
worker = diagnostics.get("worker") or {}
|
||
state = worker.get("state") or {}
|
||
counts = diagnostics.get("status_counts") or {}
|
||
count_text = ", ".join(f"{key}: {value}" for key, value in sorted(counts.items())) or "-"
|
||
configured = ", ".join(worker.get("configured_printers") or []) or "všetky"
|
||
self._add_row(panel, [
|
||
("Stav", 0.14, "left", self._status_color("ok", online=bool(worker.get("running")))),
|
||
("beží" if worker.get("running") else "nebeží", 0.15),
|
||
("Agent", 0.10),
|
||
(str(state.get("agent_id") or "-"), 0.22),
|
||
("Tlačiarne", 0.12),
|
||
(configured, 0.27),
|
||
])
|
||
self._add_row(panel, [
|
||
("Joby", 0.14),
|
||
(count_text, 0.56),
|
||
("Posledný cyklus", 0.14),
|
||
(str(state.get("last_cycle_at") or "-"), 0.16),
|
||
])
|
||
for hint in diagnostics.get("hints") or []:
|
||
self._add_row(panel, [("Upozornenie", 0.16), (self._short(hint, 180), 0.84, "left", (1.0, 0.74, 0.25, 1))], height=dp(38))
|
||
|
||
def _add_printers_panel(self, printers, statuses, diagnostics: dict, printers_error: str, statuses_error: str):
|
||
panel = self._panel("Tlačiarne")
|
||
if printers_error:
|
||
self._add_row(panel, [("Prndef", 0.16), (self._short(printers_error, 150), 0.84, "left", (1, 0.45, 0.35, 1))])
|
||
if statuses_error:
|
||
self._add_row(panel, [("Stav", 0.16), (self._short(statuses_error, 150), 0.84, "left", (1, 0.70, 0.25, 1))])
|
||
|
||
status_map = {
|
||
str(getattr(item, "prn_no", "") or "").strip(): item
|
||
for item in statuses or []
|
||
}
|
||
printer_map = {
|
||
str(getattr(item, "prn_no", "") or "").strip(): item
|
||
for item in printers or []
|
||
}
|
||
jobs = diagnostics.get("unprinted_jobs") or diagnostics.get("active_jobs") or []
|
||
active_counts = {}
|
||
failed_counts = {}
|
||
for job in jobs:
|
||
prn_no = str((job or {}).get("printer_no") or "").strip()
|
||
active_counts[prn_no] = active_counts.get(prn_no, 0) + 1
|
||
if str((job or {}).get("status") or "") in {"failed", "failed_final"}:
|
||
failed_counts[prn_no] = failed_counts.get(prn_no, 0) + 1
|
||
|
||
keys = sorted({key for key in printer_map if key} | {key for key in status_map if key})
|
||
if not keys:
|
||
self._add_row(panel, [("Tlačiarne nie sú načítané.", 1)])
|
||
return
|
||
|
||
self._add_row(panel, [
|
||
("Tlačiareň", 0.25),
|
||
("Typ", 0.10),
|
||
("Stav", 0.13),
|
||
("Fronta", 0.09, "right"),
|
||
("Chyby", 0.08, "right"),
|
||
("Správa", 0.25),
|
||
("Akcia", 0.10),
|
||
], background=(0.18, 0.19, 0.20, 1), bold=True)
|
||
for prn_no in keys:
|
||
printer = printer_map.get(prn_no)
|
||
status = status_map.get(prn_no)
|
||
name = str(getattr(printer, "prn_name", "") or getattr(status, "prn_name", "") or "").strip()
|
||
cmd = str(getattr(printer, "cmd32_on", "") or getattr(status, "cmd32_on", "") or "").strip() or "-"
|
||
online = getattr(status, "online", None) if status else None
|
||
status_text = str(getattr(status, "status", "") or ("online" if online else "unknown"))
|
||
queue_size = int(active_counts.get(prn_no, getattr(status, "queue_size", 0) or 0) or 0)
|
||
failed_jobs = max(int(getattr(status, "failed_jobs", 0) or 0), int(failed_counts.get(prn_no, 0) or 0))
|
||
message = str(getattr(status, "message", "") or "").strip()
|
||
details = self._as_dict(getattr(status, "details", {}) if status else {})
|
||
if not message and details:
|
||
message = self._short(json.dumps(details, ensure_ascii=False), 80)
|
||
self._add_printer_row(
|
||
panel,
|
||
prn_no=prn_no,
|
||
name=name,
|
||
cmd=cmd,
|
||
status_text=status_text,
|
||
online=online,
|
||
queue_size=queue_size,
|
||
failed_jobs=failed_jobs,
|
||
message=message,
|
||
)
|
||
|
||
def _add_jobs_panel(self, diagnostics: dict, error: str):
|
||
panel = self._panel("Nevytlačené joby")
|
||
if error:
|
||
self._add_row(panel, [("Chyba", 0.16), (self._short(error, 150), 0.84, "left", (1, 0.45, 0.35, 1))])
|
||
return
|
||
jobs = diagnostics.get("unprinted_jobs") or diagnostics.get("active_jobs") or []
|
||
if not jobs:
|
||
self._add_row(panel, [("Aktuálne nie sú evidované čakajúce alebo chybové joby.", 1)])
|
||
return
|
||
self._add_row(panel, [
|
||
("ID", 0.06, "right"),
|
||
("Kasa", 0.07),
|
||
("Tlačiareň", 0.10),
|
||
("Stav", 0.12),
|
||
("Typ", 0.12),
|
||
("Doklad", 0.12),
|
||
("Pokusy", 0.08, "right"),
|
||
("Chyba", 0.23),
|
||
("Akcia", 0.10),
|
||
], background=(0.18, 0.19, 0.20, 1), bold=True)
|
||
for job in jobs:
|
||
self._add_job_row(panel, job)
|
||
|
||
def _add_printer_row(
|
||
self,
|
||
panel,
|
||
*,
|
||
prn_no: str,
|
||
name: str,
|
||
cmd: str,
|
||
status_text: str,
|
||
online,
|
||
queue_size: int,
|
||
failed_jobs: int,
|
||
message: str,
|
||
):
|
||
row = BoxLayout(size_hint=(1, None), height=dp(38), spacing=dp(6), padding=(dp(6), 0))
|
||
self._paint(row, (0.15, 0.16, 0.17, 1))
|
||
cells = [
|
||
(f"{prn_no} - {name}" if name else prn_no, 0.25),
|
||
(cmd, 0.10),
|
||
(status_text, 0.13, "left", self._status_color(status_text, online=online, failed=failed_jobs)),
|
||
(str(queue_size), 0.09, "right"),
|
||
(str(failed_jobs), 0.08, "right", self._status_color("", failed=failed_jobs)),
|
||
(self._short(message or "-", 90), 0.25),
|
||
]
|
||
for text, width, *rest in cells:
|
||
align = rest[0] if rest else "left"
|
||
color = rest[1] if len(rest) > 1 else (1, 1, 1, 1)
|
||
row.add_widget(self._label(text, size_hint_x=width, height=dp(38), color=color, halign=align))
|
||
if str(cmd or "").strip().upper() == "FISKAL":
|
||
btn = Button(text="Stav", size_hint=(0.10, 1), background_color=(0.22, 0.55, 0.75, 1))
|
||
btn.bind(on_release=lambda *_btn, prn_no=prn_no: self._verify_fiscal_printer(prn_no))
|
||
row.add_widget(btn)
|
||
else:
|
||
row.add_widget(Label(size_hint=(0.10, 1)))
|
||
panel.add_widget(row)
|
||
|
||
def _add_job_row(self, panel, job: dict):
|
||
attempts = f"{job.get('attempts', 0)}/{job.get('max_attempts', 0)}"
|
||
doc = job.get("receipt_no") or job.get("bon_no") or "-"
|
||
status = str(job.get("status") or "")
|
||
row = BoxLayout(size_hint=(1, None), height=dp(42), spacing=dp(6), padding=(dp(6), 0))
|
||
self._paint(row, (0.15, 0.16, 0.17, 1))
|
||
cells = [
|
||
(str(job.get("id") or ""), 0.06, "right"),
|
||
(str(job.get("id_kas") or ""), 0.07, "left"),
|
||
(str(job.get("printer_no") or ""), 0.10, "left"),
|
||
(status, 0.12, "left", self._status_color(status, failed=status in {"failed", "failed_final"})),
|
||
(str(job.get("job_type") or job.get("document_type") or ""), 0.12, "left"),
|
||
(str(doc), 0.12, "left"),
|
||
(attempts, 0.08, "right"),
|
||
(self._short(job.get("error") or "", 90), 0.23, "left", (1, 0.70, 0.25, 1)),
|
||
]
|
||
for text, width, align, *rest in cells:
|
||
color = rest[0] if rest else (1, 1, 1, 1)
|
||
row.add_widget(self._label(text, size_hint_x=width, height=dp(42), color=color, halign=align))
|
||
if status != "printed":
|
||
btn = Button(text="Reštart", size_hint=(0.10, 1), background_color=(0.20, 0.60, 0.42, 1))
|
||
btn.bind(on_release=lambda *_btn, job_id=job.get("id"): self._restart_print_job(job_id))
|
||
row.add_widget(btn)
|
||
else:
|
||
row.add_widget(Label(size_hint=(0.10, 1)))
|
||
panel.add_widget(row)
|
||
|
||
def _restart_print_job(self, job_id):
|
||
try:
|
||
api_call.retry_print_job_API(self.controller.ctx, int(job_id))
|
||
self.refresh()
|
||
except Exception as exc:
|
||
Logger.exception("Print job retry failed")
|
||
self.controller._popup_info("Tlačový job", f"Job sa nepodarilo reštartovať.\n{exc}")
|
||
|
||
def _verify_fiscal_printer(self, printer_no: str):
|
||
try:
|
||
status = api_call.load_fiscal_printer_status_API(self.controller.ctx, printer_no, timeout=10.0)
|
||
self.refresh()
|
||
message = getattr(status, "message", "") or "Stav fiskálnej tlačiarne bol načítaný."
|
||
self.controller._popup_info("Fiskálna tlačiareň", message)
|
||
except Exception as exc:
|
||
Logger.exception("Fiscal printer status failed")
|
||
self.refresh()
|
||
self.controller._popup_info("Fiskálna tlačiareň", f"Stav sa nepodarilo načítať.\n{exc}")
|
||
|
||
|
||
class SimpleDatePicker(Popup):
|
||
def __init__(self, initial_value: str, on_select, **kwargs):
|
||
super().__init__(
|
||
title="Vyber dátum",
|
||
size_hint=(None, None),
|
||
size=(dp(420), dp(430)),
|
||
auto_dismiss=False,
|
||
**kwargs,
|
||
)
|
||
self.on_select = on_select
|
||
self.current_date = self._parse_date(initial_value) or datetime.now().date()
|
||
self.root = BoxLayout(orientation="vertical", spacing=dp(8), padding=dp(10))
|
||
self.content = self.root
|
||
self._build()
|
||
|
||
def _parse_date(self, value: str):
|
||
text = str(value or "").strip()
|
||
for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y%m%d"):
|
||
try:
|
||
return datetime.strptime(text, fmt).date()
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
def _build(self):
|
||
self.root.clear_widgets()
|
||
header = BoxLayout(size_hint=(1, None), height=dp(48), spacing=dp(8))
|
||
btn_prev = Button(text="<", size_hint=(0.18, 1))
|
||
btn_next = Button(text=">", size_hint=(0.18, 1))
|
||
title = Label(
|
||
text=f"{self.current_date.month:02d}/{self.current_date.year}",
|
||
size_hint=(0.64, 1),
|
||
bold=True,
|
||
font_size=dp(18),
|
||
)
|
||
btn_prev.bind(on_release=lambda *_: self._move_month(-1))
|
||
btn_next.bind(on_release=lambda *_: self._move_month(1))
|
||
header.add_widget(btn_prev)
|
||
header.add_widget(title)
|
||
header.add_widget(btn_next)
|
||
self.root.add_widget(header)
|
||
|
||
grid = GridLayout(cols=7, spacing=dp(4), size_hint=(1, 1))
|
||
for label in ["Po", "Ut", "St", "Št", "Pi", "So", "Ne"]:
|
||
grid.add_widget(Label(text=label, bold=True, size_hint_y=None, height=dp(30)))
|
||
first_weekday, days_in_month = calendar.monthrange(self.current_date.year, self.current_date.month)
|
||
for _ in range(first_weekday):
|
||
grid.add_widget(Label(text=""))
|
||
today = datetime.now().date()
|
||
for day in range(1, days_in_month + 1):
|
||
value = self.current_date.replace(day=day)
|
||
selected = value == self.current_date
|
||
is_today = value == today
|
||
btn = Button(
|
||
text=str(day),
|
||
background_normal="",
|
||
background_color=(0.22, 0.55, 0.75, 1) if selected else ((0.20, 0.60, 0.42, 1) if is_today else (0.34, 0.34, 0.34, 1)),
|
||
)
|
||
btn.bind(on_release=lambda _btn, d=value: self._select(d))
|
||
grid.add_widget(btn)
|
||
self.root.add_widget(grid)
|
||
|
||
actions = BoxLayout(size_hint=(1, None), height=dp(48), spacing=dp(8))
|
||
btn_today = Button(text="Dnes", background_color=(0.22, 0.55, 0.75, 1))
|
||
btn_close = Button(text="Zavrieť", background_color=(0.50, 0.50, 0.50, 1))
|
||
btn_today.bind(on_release=lambda *_: self._select(datetime.now().date()))
|
||
btn_close.bind(on_release=lambda *_: self.dismiss())
|
||
actions.add_widget(btn_today)
|
||
actions.add_widget(btn_close)
|
||
self.root.add_widget(actions)
|
||
|
||
def _move_month(self, delta: int):
|
||
month = self.current_date.month + delta
|
||
year = self.current_date.year
|
||
while month < 1:
|
||
month += 12
|
||
year -= 1
|
||
while month > 12:
|
||
month -= 12
|
||
year += 1
|
||
day = min(self.current_date.day, calendar.monthrange(year, month)[1])
|
||
self.current_date = self.current_date.replace(year=year, month=month, day=day)
|
||
self._build()
|
||
|
||
def _select(self, value):
|
||
if self.on_select:
|
||
self.on_select(value.isoformat())
|
||
self.dismiss()
|
||
|
||
|
||
class UsageReportDialog(Popup):
|
||
def __init__(self, controller, **kwargs):
|
||
super().__init__(
|
||
title="Prezeranie spotreby",
|
||
size_hint=(0.97, 0.92),
|
||
auto_dismiss=False,
|
||
**kwargs,
|
||
)
|
||
self.controller = controller
|
||
self.report: data.UsageReportOut | None = None
|
||
self.selected_keys: set[str] = set()
|
||
self.mode = "current"
|
||
self.active_range = "current"
|
||
self.range_buttons = {}
|
||
today = datetime.now().date()
|
||
self.default_from = (today - timedelta(days=6)).isoformat()
|
||
self.default_to = today.isoformat()
|
||
self.printers = self._load_printers()
|
||
self.printer_no = self._default_printer_no()
|
||
|
||
root = BoxLayout(orientation="vertical", spacing=dp(8), padding=dp(10))
|
||
self._paint(root, (0.24, 0.24, 0.27, 1))
|
||
root.add_widget(self._build_filters())
|
||
|
||
body = BoxLayout(size_hint=(1, 1), spacing=dp(10))
|
||
left = BoxLayout(orientation="vertical", size_hint=(0.56, 1), spacing=dp(6))
|
||
left.add_widget(self._label("Druhy tovaru", height=dp(30), bold=True, font_size=16, shorten=False))
|
||
self.category_filter = TextInput(
|
||
text="",
|
||
hint_text="Filter druhu",
|
||
multiline=False,
|
||
size_hint=(1, None),
|
||
height=dp(42),
|
||
write_tab=False,
|
||
)
|
||
self.category_filter.bind(text=lambda *_: self._refresh_lists())
|
||
left.add_widget(self.category_filter)
|
||
self.categories_grid = GridLayout(cols=1, spacing=dp(3), size_hint_y=None)
|
||
self.categories_grid.bind(minimum_height=self.categories_grid.setter("height"))
|
||
cats_scroll = ScrollView(size_hint=(1, 1), do_scroll_x=False, bar_width=dp(14))
|
||
cats_scroll.add_widget(self.categories_grid)
|
||
left.add_widget(cats_scroll)
|
||
|
||
right = BoxLayout(orientation="vertical", size_hint=(0.44, 1), spacing=dp(6))
|
||
right.add_widget(self._label("Položky", height=dp(30), bold=True, font_size=16, shorten=False))
|
||
self.items_grid = GridLayout(cols=1, spacing=dp(3), size_hint_y=None)
|
||
self.items_grid.bind(minimum_height=self.items_grid.setter("height"))
|
||
items_scroll = ScrollView(size_hint=(1, 1), do_scroll_x=False, bar_width=dp(14))
|
||
items_scroll.add_widget(self.items_grid)
|
||
right.add_widget(items_scroll)
|
||
|
||
body.add_widget(left)
|
||
body.add_widget(right)
|
||
root.add_widget(body)
|
||
root.add_widget(self._build_footer())
|
||
self.content = root
|
||
Clock.schedule_once(lambda *_: self.load_report("current"), 0)
|
||
|
||
def _paint(self, widget, color):
|
||
with widget.canvas.before:
|
||
Color(*color)
|
||
rect = Rectangle(pos=widget.pos, size=widget.size)
|
||
widget.bind(pos=lambda inst, *_: setattr(rect, "pos", inst.pos))
|
||
widget.bind(size=lambda inst, *_: setattr(rect, "size", inst.size))
|
||
return widget
|
||
|
||
def _label(
|
||
self,
|
||
text: str,
|
||
*,
|
||
size_hint_x=1,
|
||
height=dp(28),
|
||
bold=False,
|
||
color=(1, 1, 1, 1),
|
||
halign="left",
|
||
font_size=14,
|
||
shorten=True,
|
||
):
|
||
lbl = Label(
|
||
text=str(text or ""),
|
||
size_hint=(size_hint_x, None),
|
||
height=height,
|
||
bold=bold,
|
||
color=color,
|
||
halign=halign,
|
||
valign="middle",
|
||
font_size=dp(font_size),
|
||
shorten=shorten,
|
||
shorten_from="right",
|
||
)
|
||
lbl.bind(size=lambda inst, *_: setattr(inst, "text_size", (max(inst.width - dp(6), dp(10)), inst.height)))
|
||
return lbl
|
||
|
||
def _build_filters(self):
|
||
row = BoxLayout(size_hint=(1, None), height=dp(48), spacing=dp(8))
|
||
btn_current = Button(text="Aktuálne", size_hint=(0.16, 1), background_color=(0.22, 0.55, 0.75, 1))
|
||
btn_7 = Button(text="7 dní", size_hint=(0.12, 1), background_color=(0.34, 0.34, 0.34, 1))
|
||
btn_30 = Button(text="30 dní", size_hint=(0.12, 1), background_color=(0.34, 0.34, 0.34, 1))
|
||
self.range_buttons = {
|
||
"current": btn_current,
|
||
"7": btn_7,
|
||
"30": btn_30,
|
||
}
|
||
self.date_from_input = TextInput(
|
||
text=self.default_from,
|
||
multiline=False,
|
||
write_tab=False,
|
||
size_hint=(0.14, 1),
|
||
)
|
||
self.date_to_input = TextInput(
|
||
text=self.default_to,
|
||
multiline=False,
|
||
write_tab=False,
|
||
size_hint=(0.14, 1),
|
||
)
|
||
btn_from_cal = Button(text="...", size_hint=(0.05, 1), background_color=(0.34, 0.34, 0.34, 1))
|
||
btn_to_cal = Button(text="...", size_hint=(0.05, 1), background_color=(0.34, 0.34, 0.34, 1))
|
||
btn_load = Button(text="Načítať", size_hint=(0.12, 1), background_color=(0.22, 0.55, 0.75, 1))
|
||
btn_current.bind(on_release=lambda *_: self.load_report("current"))
|
||
btn_7.bind(on_release=lambda *_: self.load_days(7))
|
||
btn_30.bind(on_release=lambda *_: self.load_days(30))
|
||
btn_load.bind(on_release=lambda *_: self.load_report("period"))
|
||
btn_from_cal.bind(on_release=lambda *_: self._open_calendar(self.date_from_input))
|
||
btn_to_cal.bind(on_release=lambda *_: self._open_calendar(self.date_to_input))
|
||
row.add_widget(btn_current)
|
||
row.add_widget(btn_7)
|
||
row.add_widget(btn_30)
|
||
row.add_widget(self._label("Od", size_hint_x=0.035, height=dp(48), halign="right"))
|
||
row.add_widget(self.date_from_input)
|
||
row.add_widget(btn_from_cal)
|
||
row.add_widget(self._label("Do", size_hint_x=0.035, height=dp(48), halign="right"))
|
||
row.add_widget(self.date_to_input)
|
||
row.add_widget(btn_to_cal)
|
||
row.add_widget(btn_load)
|
||
return row
|
||
|
||
def _build_footer(self):
|
||
row = BoxLayout(size_hint=(1, None), height=dp(56), spacing=dp(8))
|
||
self.summary_label = self._label("", size_hint_x=0.42, height=dp(56), shorten=False)
|
||
printer_values = [self._printer_label(printer) for printer in self.printers]
|
||
if not printer_values:
|
||
printer_values = ["Bez tlačiarne"]
|
||
self.printer_spinner = Spinner(
|
||
text=self._printer_label_by_no(self.printer_no) or printer_values[0],
|
||
values=printer_values,
|
||
size_hint=(0.24, 1),
|
||
)
|
||
self.printer_spinner.bind(text=lambda _inst, value: self._set_printer_from_label(value))
|
||
btn_all = Button(text="Všetko", size_hint=(0.10, 1), background_color=(0.34, 0.34, 0.34, 1))
|
||
btn_none = Button(text="Nič", size_hint=(0.10, 1), background_color=(0.34, 0.34, 0.34, 1))
|
||
btn_print = Button(text="Vytlačiť", size_hint=(0.14, 1), background_color=(0.22, 0.55, 0.75, 1))
|
||
btn_close = Button(text="Zavrieť", size_hint=(0.12, 1), background_color=(0.50, 0.50, 0.50, 1))
|
||
btn_all.bind(on_release=lambda *_: self.select_all())
|
||
btn_none.bind(on_release=lambda *_: self.select_none())
|
||
btn_print.bind(on_release=lambda *_: self.print_report())
|
||
btn_close.bind(on_release=lambda *_: self.dismiss())
|
||
row.add_widget(self.summary_label)
|
||
row.add_widget(self.printer_spinner)
|
||
row.add_widget(btn_all)
|
||
row.add_widget(btn_none)
|
||
row.add_widget(btn_print)
|
||
row.add_widget(btn_close)
|
||
return row
|
||
|
||
def _load_printers(self):
|
||
try:
|
||
return list(self.controller._load_clsrep_printers())
|
||
except Exception:
|
||
Logger.exception("Usage printers load failed")
|
||
return []
|
||
|
||
def _printer_label(self, printer):
|
||
no = str(getattr(printer, "prn_no", "") or "").strip()
|
||
name = str(getattr(printer, "prn_name", "") or "").strip()
|
||
return f"{no} - {name}".strip(" -")
|
||
|
||
def _default_printer_no(self):
|
||
default_no = str(getattr(self.controller, "default_printer", "") or "").strip()
|
||
numbers = {str(getattr(printer, "prn_no", "") or "").strip() for printer in self.printers}
|
||
if default_no and default_no in numbers:
|
||
return default_no
|
||
if self.printers:
|
||
return str(getattr(self.printers[0], "prn_no", "") or "").strip()
|
||
return ""
|
||
|
||
def _printer_label_by_no(self, printer_no: str):
|
||
printer_no = str(printer_no or "").strip()
|
||
for printer in self.printers:
|
||
if str(getattr(printer, "prn_no", "") or "").strip() == printer_no:
|
||
return self._printer_label(printer)
|
||
return ""
|
||
|
||
def _set_printer_from_label(self, value: str):
|
||
self.printer_no = str(value or "").split(" - ", 1)[0].strip()
|
||
|
||
def _set_active_range(self, key: str):
|
||
self.active_range = str(key or "period")
|
||
for name, button in self.range_buttons.items():
|
||
active = name == self.active_range
|
||
button.background_normal = ""
|
||
button.background_color = (0.22, 0.55, 0.75, 1) if active else (0.34, 0.34, 0.34, 1)
|
||
|
||
def _open_calendar(self, target_input: TextInput):
|
||
def select(value: str):
|
||
target_input.text = value
|
||
self._set_active_range("period")
|
||
|
||
SimpleDatePicker(target_input.text, on_select=select).open()
|
||
|
||
def load_days(self, days: int):
|
||
today = datetime.now().date()
|
||
self.date_from_input.text = (today - timedelta(days=max(days - 1, 0))).isoformat()
|
||
self.date_to_input.text = today.isoformat()
|
||
self.load_report("period", days_back=days, active_key=str(days))
|
||
|
||
def load_report(self, mode: str, days_back: int = 0, active_key: str | None = None):
|
||
self.mode = mode
|
||
self._set_active_range(active_key or ("current" if mode == "current" else "period"))
|
||
try:
|
||
previous = set(self.selected_keys)
|
||
report = api_call.load_usage_report_API(
|
||
self.controller.ctx,
|
||
mode=mode,
|
||
date_from=self.date_from_input.text,
|
||
date_to=self.date_to_input.text,
|
||
days_back=days_back,
|
||
)
|
||
self.report = report
|
||
keys = {category.key for category in report.categories}
|
||
self.selected_keys = (previous & keys) if previous else set(keys)
|
||
self._refresh_lists()
|
||
except Exception as exc:
|
||
Logger.exception("Usage report load failed")
|
||
self.controller._popup_info("Prezeranie spotreby", f"Spotrebu sa nepodarilo načítať.\n{exc}")
|
||
|
||
def _fmt_qty(self, value):
|
||
try:
|
||
num = float(value or 0)
|
||
except Exception:
|
||
num = 0.0
|
||
if abs(num - round(num)) < 0.000001:
|
||
return str(int(round(num)))
|
||
return f"{num:.3f}".rstrip("0").rstrip(".").replace(".", ",")
|
||
|
||
def _fmt_money(self, value):
|
||
try:
|
||
num = float(value or 0)
|
||
except Exception:
|
||
num = 0.0
|
||
try:
|
||
currency = self.controller._currency()
|
||
except Exception:
|
||
currency = "EUR"
|
||
return f"{num:.2f}".replace(".", ",") + f" {currency}"
|
||
|
||
def _visible_categories(self):
|
||
if not self.report:
|
||
return []
|
||
needle = (self.category_filter.text or "").strip().lower()
|
||
out = []
|
||
for category in self.report.categories:
|
||
name = str(category.name or "")
|
||
if needle and needle not in name.lower():
|
||
continue
|
||
out.append(category)
|
||
return out
|
||
|
||
def _selected_categories(self):
|
||
if not self.report:
|
||
return []
|
||
return [category for category in self.report.categories if category.key in self.selected_keys]
|
||
|
||
def _toggle_category(self, key: str):
|
||
if key in self.selected_keys:
|
||
self.selected_keys.remove(key)
|
||
else:
|
||
self.selected_keys.add(key)
|
||
self._refresh_lists()
|
||
|
||
def select_all(self):
|
||
if self.report:
|
||
self.selected_keys = {category.key for category in self.report.categories}
|
||
self._refresh_lists()
|
||
|
||
def select_none(self):
|
||
self.selected_keys = set()
|
||
self._refresh_lists()
|
||
|
||
def _refresh_lists(self):
|
||
self.categories_grid.clear_widgets()
|
||
self.items_grid.clear_widgets()
|
||
if not self.report:
|
||
self.summary_label.text = ""
|
||
return
|
||
for category in self._visible_categories():
|
||
selected = category.key in self.selected_keys
|
||
text = f"{'✓ ' if selected else ''}{category.name}\n{self._fmt_qty(category.quantity)} ks {self._fmt_money(category.amount)}"
|
||
btn = Button(
|
||
text=text,
|
||
size_hint=(1, None),
|
||
height=dp(56),
|
||
halign="left",
|
||
valign="middle",
|
||
background_color=(0.22, 0.55, 0.75, 1) if selected else (0.32, 0.32, 0.32, 1),
|
||
)
|
||
btn.bind(size=lambda inst, *_: setattr(inst, "text_size", (max(inst.width - dp(12), dp(10)), inst.height)))
|
||
btn.bind(on_release=lambda _btn, key=category.key: self._toggle_category(key))
|
||
self.categories_grid.add_widget(btn)
|
||
selected_categories = self._selected_categories()
|
||
self._fill_items(selected_categories)
|
||
qty = sum(float(category.quantity or 0) for category in selected_categories)
|
||
amount = sum(float(category.amount or 0) for category in selected_categories)
|
||
interval = "aktuálne účty" if self.report.mode == "current" else f"{self.report.date_from} - {self.report.date_to}"
|
||
self.summary_label.text = f"{interval}\nVybrané: {len(selected_categories)} druhov, {self._fmt_qty(qty)} ks, {self._fmt_money(amount)}"
|
||
|
||
def _fill_items(self, categories):
|
||
if not categories:
|
||
self.items_grid.add_widget(
|
||
self._label("Vyberte aspoň jeden druh tovaru.", height=dp(42), color=(1, 0.75, 0.35, 1), shorten=False)
|
||
)
|
||
return
|
||
for category in categories:
|
||
header = BoxLayout(size_hint=(1, None), height=dp(38), spacing=dp(6), padding=(dp(6), 0))
|
||
self._paint(header, (0.18, 0.19, 0.20, 1))
|
||
header.add_widget(self._label(category.name, size_hint_x=0.64, height=dp(38), bold=True, shorten=False))
|
||
header.add_widget(self._label(self._fmt_qty(category.quantity), size_hint_x=0.14, height=dp(38), bold=True, halign="right"))
|
||
header.add_widget(self._label(self._fmt_money(category.amount), size_hint_x=0.22, height=dp(38), bold=True, halign="right"))
|
||
self.items_grid.add_widget(header)
|
||
for item in category.items:
|
||
row = BoxLayout(size_hint=(1, None), height=dp(34), spacing=dp(6), padding=(dp(6), 0))
|
||
row.add_widget(self._label(self._fmt_qty(item.quantity), size_hint_x=0.12, height=dp(34), halign="right", color=(0.60, 0.82, 1, 1)))
|
||
row.add_widget(self._label(item.name, size_hint_x=0.62, height=dp(34), shorten=True))
|
||
row.add_widget(self._label(self._fmt_money(item.amount), size_hint_x=0.26, height=dp(34), halign="right"))
|
||
self.items_grid.add_widget(row)
|
||
|
||
def _render_print_text(self):
|
||
if not self.report:
|
||
return ""
|
||
cats = self._selected_categories()
|
||
lines = [
|
||
"PREZERANIE SPOTREBY",
|
||
f"Pokladna: {self.report.id_kas}",
|
||
]
|
||
if self.report.mode == "current":
|
||
lines.append("Obdobie: aktualne ucty od poslednej uzavierky")
|
||
else:
|
||
lines.append(f"Obdobie: {self.report.date_from} - {self.report.date_to}")
|
||
if self.report.ucislo_from or self.report.ucislo_to:
|
||
lines.append(f"Ucty: {self.report.ucislo_from} - {self.report.ucislo_to}")
|
||
lines.append("-" * 42)
|
||
for category in cats:
|
||
lines.append(f"{category.name}")
|
||
lines.append(f" spolu: {self._fmt_qty(category.quantity)} ks {self._fmt_money(category.amount)}")
|
||
for item in category.items:
|
||
name = str(item.name or "")[:26]
|
||
lines.append(f" {self._fmt_qty(item.quantity):>7} {name:<26} {self._fmt_money(item.amount):>12}")
|
||
lines.append("")
|
||
qty = sum(float(category.quantity or 0) for category in cats)
|
||
amount = sum(float(category.amount or 0) for category in cats)
|
||
lines.append("-" * 42)
|
||
lines.append(f"SPOLU: {self._fmt_qty(qty)} ks {self._fmt_money(amount)}")
|
||
return "\n".join(lines).rstrip() + "\n"
|
||
|
||
def print_report(self):
|
||
if not self.report:
|
||
self.controller._popup_info("Prezeranie spotreby", "Najprv načítajte report.")
|
||
return
|
||
if not self.selected_keys:
|
||
self.controller._popup_info("Prezeranie spotreby", "Nie je vybraný žiadny druh tovaru.")
|
||
return
|
||
if not self.printer_no:
|
||
self.controller._popup_info("Prezeranie spotreby", "Nie je vybraná tlačiareň.")
|
||
return
|
||
try:
|
||
jobs = api_call.create_closure_print_jobs_API(
|
||
self.controller.ctx,
|
||
text=self._render_print_text(),
|
||
printer_no=self.printer_no,
|
||
clsrep_no=None,
|
||
kind="usage",
|
||
title="Prezeranie spotreby",
|
||
required=False,
|
||
priority=35,
|
||
copies=1,
|
||
)
|
||
self.controller._popup_info(
|
||
"Prezeranie spotreby",
|
||
f"Report bol zaradený do tlačovej fronty ({len(jobs)} job).",
|
||
)
|
||
except Exception as exc:
|
||
Logger.exception("Usage report print failed")
|
||
self.controller._popup_info("Prezeranie spotreby", f"Report sa nepodarilo vytlačiť.\n{exc}")
|
||
|
||
|
||
class ClosedPaymentHandlerHost:
|
||
_run_payment_handler = posdialog.POSDialog._run_payment_handler
|
||
_parse_payment_handler = posdialog.POSDialog._parse_payment_handler
|
||
_handler_dotaz_re = posdialog.POSDialog._handler_dotaz_re
|
||
_handler_dotaz_st = posdialog.POSDialog._handler_dotaz_st
|
||
_finish_dotaz_st = posdialog.POSDialog._finish_dotaz_st
|
||
_handler_dotaz_ho = posdialog.POSDialog._handler_dotaz_ho
|
||
_dotaz_ho_open_targets = posdialog.POSDialog._dotaz_ho_open_targets
|
||
_dotaz_ho_room_selected = posdialog.POSDialog._dotaz_ho_room_selected
|
||
_dotaz_ho_manual_room = posdialog.POSDialog._dotaz_ho_manual_room
|
||
_dotaz_ho_read_card = posdialog.POSDialog._dotaz_ho_read_card
|
||
_dotaz_ho_card_loaded = posdialog.POSDialog._dotaz_ho_card_loaded
|
||
_dotaz_ho_load_guests = posdialog.POSDialog._dotaz_ho_load_guests
|
||
_dotaz_ho_guest_selected = posdialog.POSDialog._dotaz_ho_guest_selected
|
||
_finish_dotaz_ho = posdialog.POSDialog._finish_dotaz_ho
|
||
_handler_price_level_id = posdialog.POSDialog._handler_price_level_id
|
||
_parse_handler_discount = posdialog.POSDialog._parse_handler_discount
|
||
_normalize_text = posdialog.POSDialog._normalize_text
|
||
|
||
def __init__(self, controller):
|
||
self.update(controller)
|
||
|
||
def update(self, controller):
|
||
self.controller = controller
|
||
self.setup = controller.setup
|
||
self.cenik = controller.cenik
|
||
self.modal_manager = controller.modal_manager
|
||
self.levels = list(getattr(controller, "_levels", []) or [])
|
||
self.alllevels = list(getattr(controller, "_price_levels", []) or [])
|
||
|
||
def _apply_payment_price_level(self, ucet, price_level, allowed_line_ids=None):
|
||
return
|
||
|
||
|
||
class GuestMappingPopup(BoxLayout):
|
||
def __init__(
|
||
self,
|
||
target_ucet,
|
||
source_guest_ids,
|
||
on_done,
|
||
source_guests=None,
|
||
on_cancel=None,
|
||
request_guest_name=None,
|
||
**kwargs
|
||
):
|
||
super().__init__(orientation="vertical", spacing=dp(6), padding=dp(10), **kwargs)
|
||
|
||
self.target_ucet = target_ucet
|
||
self.source_guest_ids = list(source_guest_ids)
|
||
self.source_guests = list(source_guests or [])
|
||
self.on_done = on_done
|
||
self.on_cancel = on_cancel
|
||
self.request_guest_name = request_guest_name
|
||
|
||
self.mapping_widgets = {} # src_id → spinner
|
||
|
||
# 🔹 existujúci hostia
|
||
self.target_guests = target_ucet.guests[:] if target_ucet.guests else []
|
||
|
||
# 🔹 HEADER
|
||
self.add_widget(Label(text="Priradenie hostí", size_hint=(1, None), height=dp(30)))
|
||
|
||
# 🔹 ROWS
|
||
for src_id in self.source_guest_ids:
|
||
row = BoxLayout(size_hint=(1, None), height=dp(40), spacing=dp(6))
|
||
|
||
src_name = self._get_guest_name(src_id, source=True)
|
||
|
||
row.add_widget(Label(text=src_name, size_hint=(0.4, 1)))
|
||
|
||
spinner = Spinner(
|
||
text=self._default_target_name(src_name),
|
||
values=self._target_guest_names(),
|
||
size_hint=(0.6, 1)
|
||
)
|
||
|
||
row.add_widget(spinner)
|
||
|
||
self.mapping_widgets[src_id] = spinner
|
||
|
||
self.add_widget(row)
|
||
|
||
# 🔹 BUTTONS
|
||
btns = BoxLayout(
|
||
size_hint=(1, None),
|
||
height=dp(50),
|
||
spacing=dp(8),
|
||
)
|
||
btn_add = Button(text="+ nový hosť")
|
||
btn_cancel = Button(
|
||
text="Zrušiť",
|
||
background_color=(0.6, 0.2, 0.2, 1),
|
||
)
|
||
btn_ok = Button(
|
||
text="OK",
|
||
background_color=(0.2, 0.6, 0.2, 1),
|
||
)
|
||
btn_add.bind(on_press=self._add_new_guest)
|
||
btn_cancel.bind(on_press=lambda *_: self.on_cancel() if self.on_cancel else None)
|
||
btn_ok.bind(on_press=self._confirm)
|
||
btns.add_widget(btn_cancel)
|
||
btns.add_widget(btn_add)
|
||
btns.add_widget(btn_ok)
|
||
self.add_widget(btns)
|
||
|
||
# --------------------------------------------------
|
||
|
||
def _get_guest_name(self, guest_id, source=False):
|
||
if source:
|
||
for g in self.source_guests:
|
||
if g["id"] == guest_id:
|
||
return g.get("name", guest_id)
|
||
return guest_id
|
||
else:
|
||
for g in self.target_guests:
|
||
if g["id"] == guest_id:
|
||
return g.get("name", guest_id)
|
||
return guest_id
|
||
|
||
def _target_guest_names(self):
|
||
return [g.get("name", g["id"]) for g in self.target_guests]
|
||
|
||
def _default_target_name(self, src_name):
|
||
# 🔥 auto-match podľa mena
|
||
for g in self.target_guests:
|
||
if g.get("name") == src_name:
|
||
return src_name
|
||
if self.target_guests:
|
||
return self.target_guests[0].get("name")
|
||
return "Hosť 1"
|
||
|
||
# --------------------------------------------------
|
||
|
||
def _add_new_guest(self, *_):
|
||
if self.request_guest_name:
|
||
self.request_guest_name(self._add_guest_with_name)
|
||
return
|
||
self._add_guest_with_name(f"Hosť {len(self.target_guests) + 1}")
|
||
|
||
def _add_guest_with_name(self, new_name):
|
||
if not new_name:
|
||
return
|
||
new_id = self._new_guest_id()
|
||
new_guest = {
|
||
"id": new_id,
|
||
"name": new_name
|
||
}
|
||
self.target_guests.append(new_guest)
|
||
if not self.target_ucet.guests:
|
||
self.target_ucet.guests = []
|
||
self.target_ucet.guests.append(new_guest)
|
||
names = self._target_guest_names()
|
||
# 🔥 update všetkých spinnerov
|
||
for spinner in self.mapping_widgets.values():
|
||
spinner.values = names
|
||
if spinner.text not in names:
|
||
spinner.text = new_name
|
||
|
||
# --------------------------------------------------
|
||
|
||
def _new_guest_id(self):
|
||
existing = {g["id"] for g in self.target_guests}
|
||
i = 1
|
||
while True:
|
||
gid = f"g{i}"
|
||
if gid not in existing:
|
||
return gid
|
||
i += 1
|
||
|
||
# --------------------------------------------------
|
||
|
||
def _confirm(self, *_):
|
||
guest_map = {}
|
||
|
||
for src_id, spinner in self.mapping_widgets.items():
|
||
selected_name = spinner.text
|
||
|
||
target = next(
|
||
(g for g in self.target_guests if g.get("name") == selected_name),
|
||
None
|
||
)
|
||
|
||
if target:
|
||
guest_map[src_id] = target["id"]
|
||
self.on_done({
|
||
"guest_map": guest_map,
|
||
"guests": self.target_guests
|
||
})
|
||
|
||
|
||
|
||
class LimitTargetMappingPopup(BoxLayout):
|
||
def __init__(
|
||
self,
|
||
source_course_ids,
|
||
source_guest_ids,
|
||
source_courses,
|
||
source_guests,
|
||
target_courses,
|
||
target_guests,
|
||
on_done,
|
||
on_cancel,
|
||
labels=None,
|
||
**kwargs,
|
||
):
|
||
super().__init__(orientation="vertical", spacing=dp(8), padding=dp(10), **kwargs)
|
||
labels = labels or {}
|
||
self.source_courses = list(source_courses or [])
|
||
self.source_guests = list(source_guests or [])
|
||
self.on_done = on_done
|
||
self.on_cancel = on_cancel
|
||
self.course_widgets = {}
|
||
self.guest_widgets = {}
|
||
self.course_options = self._build_options(target_courses, "id", "name", "Chod")
|
||
self.guest_options = self._build_options(target_guests, "id", "name", "Hladina")
|
||
|
||
self.add_widget(Label(
|
||
text=labels.get("title", "Priradenie na limitovy stol"),
|
||
size_hint=(1, None),
|
||
height=dp(32),
|
||
bold=True,
|
||
))
|
||
|
||
scroll = ScrollView(size_hint=(1, 1))
|
||
body = BoxLayout(orientation="vertical", size_hint_y=None, spacing=dp(8))
|
||
body.bind(minimum_height=body.setter("height"))
|
||
self._add_section(
|
||
body,
|
||
labels.get("courses", "Chody"),
|
||
source_course_ids,
|
||
self.source_courses,
|
||
self.course_options,
|
||
self.course_widgets,
|
||
"Chod",
|
||
)
|
||
self._add_section(
|
||
body,
|
||
labels.get("guests", "Hostia / hladiny"),
|
||
source_guest_ids,
|
||
self.source_guests,
|
||
self.guest_options,
|
||
self.guest_widgets,
|
||
"Host",
|
||
)
|
||
scroll.add_widget(body)
|
||
self.add_widget(scroll)
|
||
|
||
btns = BoxLayout(size_hint=(1, None), height=dp(50), spacing=dp(8))
|
||
btn_cancel = Button(text=labels.get("cancel", "Zrusit"), background_color=(0.6, 0.2, 0.2, 1))
|
||
btn_ok = Button(text=labels.get("ok", "OK"), background_color=(0.2, 0.6, 0.2, 1))
|
||
btn_cancel.bind(on_press=lambda *_: self.on_cancel())
|
||
btn_ok.bind(on_press=self._confirm)
|
||
btns.add_widget(btn_cancel)
|
||
btns.add_widget(btn_ok)
|
||
self.add_widget(btns)
|
||
|
||
def _build_options(self, rows, id_key, name_key, fallback):
|
||
options = []
|
||
used = {}
|
||
for row in rows or []:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
value = str(row.get(id_key, "") or "")
|
||
name = str(row.get(name_key, "") or "").strip() or value or fallback
|
||
used[name] = used.get(name, 0) + 1
|
||
label = name if used[name] == 1 else f"{name} ({value})"
|
||
options.append({"id": value, "name": name, "label": label})
|
||
return options
|
||
|
||
def _source_name(self, source_id, rows, fallback):
|
||
sid = str(source_id or "")
|
||
for row in rows or []:
|
||
if isinstance(row, dict) and str(row.get("id", "") or "") == sid:
|
||
return str(row.get("name", "") or sid or fallback).strip()
|
||
return sid or fallback
|
||
|
||
def _default_option_label(self, source_name, options):
|
||
source_name = str(source_name or "").strip()
|
||
for option in options:
|
||
if option["name"] == source_name:
|
||
return option["label"]
|
||
return options[0]["label"] if options else ""
|
||
|
||
def _add_section(self, body, title, source_ids, source_rows, options, widgets, fallback):
|
||
body.add_widget(Label(
|
||
text=title,
|
||
size_hint=(1, None),
|
||
height=dp(28),
|
||
bold=True,
|
||
))
|
||
values = [option["label"] for option in options]
|
||
if not source_ids:
|
||
body.add_widget(Label(
|
||
text="Nie je co priradit",
|
||
size_hint=(1, None),
|
||
height=dp(34),
|
||
))
|
||
return
|
||
if not values:
|
||
body.add_widget(Label(
|
||
text="Cielovy limit nema ziadne hodnoty",
|
||
size_hint=(1, None),
|
||
height=dp(34),
|
||
))
|
||
return
|
||
for source_id in source_ids:
|
||
src_name = self._source_name(source_id, source_rows, fallback)
|
||
row = BoxLayout(size_hint=(1, None), height=dp(44), spacing=dp(8))
|
||
row.add_widget(Label(
|
||
text=src_name,
|
||
size_hint=(0.44, 1),
|
||
halign="left",
|
||
valign="middle",
|
||
))
|
||
spinner = Spinner(
|
||
text=self._default_option_label(src_name, options),
|
||
values=values,
|
||
size_hint=(0.56, 1),
|
||
)
|
||
row.add_widget(spinner)
|
||
widgets[str(source_id or "")] = spinner
|
||
body.add_widget(row)
|
||
|
||
def _selected_id(self, spinner, options):
|
||
for option in options:
|
||
if option["label"] == spinner.text:
|
||
return option["id"]
|
||
return options[0]["id"] if options else ""
|
||
|
||
def _confirm(self, *_):
|
||
self.on_done({
|
||
"course_map": {
|
||
src_id: self._selected_id(spinner, self.course_options)
|
||
for src_id, spinner in self.course_widgets.items()
|
||
},
|
||
"guest_map": {
|
||
src_id: self._selected_id(spinner, self.guest_options)
|
||
for src_id, spinner in self.guest_widgets.items()
|
||
},
|
||
})
|
||
|
||
|
||
class ConfigManager:
|
||
def __init__(self):
|
||
self.config = None
|
||
self.pending_operation = None
|
||
self.path = self._get_config_path()
|
||
self._load()
|
||
self.__dict__.update(self.config)
|
||
def _get_config_path(self):
|
||
# pokus: config vedle aplikaceb
|
||
# nacte login k zakazce, id_kas, client_id...
|
||
local_path = os.path.join(os.getcwd(), "config.json")
|
||
if os.path.exists(local_path):
|
||
#print("Používám lokální config:", local_path)
|
||
return local_path
|
||
# fallback: user_data_dir
|
||
app = App.get_running_app()
|
||
roaming_path = os.path.join(app.user_data_dir, "config.json")
|
||
#print("Používám user_data_dir config:", roaming_path)
|
||
return roaming_path
|
||
def _load(self):
|
||
if not os.path.exists(self.path):
|
||
self.config = DEFAULT_CONFIG.copy()
|
||
self._save()
|
||
else:
|
||
print("====================================\nCONFIG PATH:", self.path)
|
||
with open(self.path, "r", encoding="utf-8") as f:
|
||
self.config = json.load(f)
|
||
def _save(self):
|
||
with open(self.path, "w", encoding="utf-8") as f:
|
||
json.dump(self.config, f, indent=4)
|
||
def get(self, key):
|
||
return self.config.get(key)
|
||
def set(self, key, value):
|
||
self.config[key] = value
|
||
self._save()
|
||
def all(self):
|
||
return self.config
|
||
|
||
def read_start_param(name, cfg, vAPI:str = "Unknow", vDBT:str = "Unknown", vRQ:str = "Unknown") -> str:
|
||
return(\
|
||
f" User: {name}\n"\
|
||
f" Base_url: {cfg.base_url}\n"\
|
||
f" Refr_url: {cfg.refresh_url}\n"\
|
||
f" Id_kas: {cfg.id_kas}\n"\
|
||
f" Client_id: {cfg.client_id}\n"\
|
||
f" API ver.: {vAPI}\n"\
|
||
f" DTB name: {vDBT}\n"\
|
||
f" App ver.: {vRQ}\n"\
|
||
f" Bill_prn: {cfg.bill_printer}\n"\
|
||
f" Bon_prn1: {cfg.bon_printer1}\n"\
|
||
f" Bon_prn2: {cfg.bon_printer2}\n"\
|
||
f" App IP: {get_local_ip()}\n")
|
||
|
||
def get_local_ip():
|
||
try:
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
s.connect(("8.8.8.8", 80))
|
||
ip = s.getsockname()[0]
|
||
s.close()
|
||
return ip
|
||
except Exception:
|
||
return "Offline"
|
||
|
||
class ApiController:
|
||
def __init__(self, app):
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s",)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
LOG_FILE = Path(__file__).with_name("server_sqlite.log")
|
||
if not any(
|
||
isinstance(handler, logging.FileHandler)
|
||
and Path(getattr(handler, "baseFilename", "")) == LOG_FILE
|
||
for handler in logging.getLogger().handlers
|
||
):
|
||
file_handler = logging.FileHandler(LOG_FILE, encoding="utf-8")
|
||
file_handler.setLevel(logging.INFO)
|
||
file_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
|
||
logging.getLogger().addHandler(file_handler)
|
||
logging.getLogger().setLevel(logging.INFO)
|
||
|
||
Logger.info("Inicializace ApiController")
|
||
self.app = app
|
||
cfg = ConfigManager()
|
||
print(read_start_param("", cfg) )
|
||
self.ctx = ApiContext(
|
||
user=cfg.user,
|
||
base_url=cfg.base_url,
|
||
refresh_url=cfg.refresh_url,
|
||
client_id=cfg.client_id,
|
||
id_kas=cfg.id_kas,
|
||
username=cfg.username,
|
||
password=SecretStr(cfg.password),
|
||
)
|
||
self.mapa_stolu = None #data.MapaStolu | None
|
||
self.version_frontend = FrontEndVersion
|
||
self.version_API = None
|
||
self.database_name = None
|
||
self._popup_info = _popup_info
|
||
self.modal_manager = ModalManager()
|
||
self.setup: data.PosSetup | None = None
|
||
self.cenik: data.Cenik | None = None
|
||
self.cenik_ui: data.Cenik | None = None
|
||
self.cenik_texts: dict[int, data.CenikText] = {}
|
||
self.translator = Translator("sk")
|
||
self.zlavy: data.Zlavy | None = None
|
||
self.fstmenu: data.FstMenuKasa | None = None
|
||
self.ready = False
|
||
self.last_error = ""
|
||
self.user_login = None #data.UserLoginOut | None
|
||
self._editing_stul = None
|
||
self._opened_dummy = False
|
||
self._pos_mode = "normal"
|
||
self._bar_stul = None
|
||
self._limit_stul = None
|
||
self._limit_id = None
|
||
self._limit_den_id = None
|
||
self._limit_tables_cache = []
|
||
self._allow_account_refresh = False
|
||
self._split_u_main = None
|
||
self._split_u_sec = None
|
||
self._split_target_stul = None
|
||
self._split_source_is_limit = False
|
||
self._split_target_is_limit = False
|
||
self._permits = set()
|
||
#Milan 15.04.26 - doplneny zoznam povolenych platieb a zliav
|
||
self._payments = set()
|
||
self._discounts = set()
|
||
self._price_levels = set()
|
||
self._levels = set()
|
||
self._printers = set()
|
||
self._bankterms = []
|
||
self.client_settings: data.ClientSettings | None = None
|
||
self.default_room = None
|
||
self.default_printer = None
|
||
self.default_room = None
|
||
self.pos_static_maps = {}
|
||
self._closed_payment_handler_host = None
|
||
self.kasutxt: data.KasUtxt | None = None
|
||
self.mamechody = getattr(self.setup, "is_chod", True) if self.setup else True
|
||
self.mamehosti = getattr(self.setup, "is_host", True) if self.setup else True
|
||
self.mametretiny = getattr(self.setup, "is_tretiny", True) if self.setup else True
|
||
self.mamestvrtiny = getattr(self.setup, "is_stvrtiny", True) if self.setup else True
|
||
|
||
# ------------------------------------------------------------------------
|
||
def tr(self, key: str, default: str | None = None, **kwargs) -> str:
|
||
return self.translator.tr(key, default, **kwargs)
|
||
|
||
def _current_language(self) -> str:
|
||
return normalize_lang(getattr(getattr(self, "user_login", None), "jazyk", "sk"))
|
||
|
||
def _currency(self) -> str:
|
||
return getattr(self.setup, "zkr_mena", "Kč") if self.setup else "Kč"
|
||
|
||
def _setup_bool(self, name: str, default: bool = False) -> bool:
|
||
if not self.setup:
|
||
return default
|
||
value = getattr(self.setup, name, default)
|
||
if isinstance(value, bool):
|
||
return value
|
||
if isinstance(value, (int, float)):
|
||
return bool(value)
|
||
text = str(value or "").strip().lower()
|
||
if text in {".t.", "t", "true", "1", "yes", "ano", "áno"}:
|
||
return True
|
||
if text in {".f.", "f", "false", "0", "no", "nie", "ne"}:
|
||
return False
|
||
return default
|
||
|
||
def _setup_int(self, name: str, default: int = 0) -> int:
|
||
if not self.setup:
|
||
return default
|
||
value = getattr(self.setup, name, default)
|
||
try:
|
||
return int(float(str(value).strip().replace(",", ".")))
|
||
except Exception:
|
||
return default
|
||
|
||
def _setup_text(self, name: str, default: str = "") -> str:
|
||
if not self.setup:
|
||
return default
|
||
return str(getattr(self.setup, name, default) or "").strip()
|
||
|
||
def _client_setting_value(self, name: str, default: str = "") -> str:
|
||
settings = self.client_settings or {}
|
||
if isinstance(settings, dict):
|
||
return str(settings.get(name, default) or "").strip()
|
||
return str(getattr(settings, name, default) or "").strip()
|
||
|
||
def _set_client_setting_value(self, name: str, value: str) -> None:
|
||
value = str(value or "").strip()
|
||
if self.client_settings is None:
|
||
self.client_settings = {"prn_no": "", "room_name": ""}
|
||
if isinstance(self.client_settings, dict):
|
||
self.client_settings[name] = value
|
||
else:
|
||
setattr(self.client_settings, name, value)
|
||
|
||
def _allowed_printer_numbers(self) -> list[str]:
|
||
return [
|
||
str(getattr(printer, "prn_no", "") or "").strip()
|
||
for printer in (self._printers or [])
|
||
if str(getattr(printer, "prn_no", "") or "").strip()
|
||
]
|
||
|
||
def _resolve_default_printer(self, saved_printer: str = "") -> str:
|
||
allowed = self._allowed_printer_numbers()
|
||
saved_printer = str(saved_printer or "").strip()
|
||
if saved_printer and saved_printer in allowed:
|
||
return saved_printer
|
||
return allowed[0] if allowed else ""
|
||
|
||
def _load_client_defaults(self):
|
||
self.client_settings = api_call.load_clientsettings_API(self.ctx)
|
||
default_printer = self._resolve_default_printer(
|
||
self._client_setting_value("prn_no")
|
||
)
|
||
self.default_printer = default_printer
|
||
self._set_client_setting_value("prn_no", default_printer)
|
||
self.default_room = self._client_setting_value("room_name") or None
|
||
Logger.info(
|
||
"CTRL: client defaults loaded "
|
||
f"default_printer={self.default_printer or '-'} "
|
||
f"default_room={self.default_room or '-'}"
|
||
)
|
||
return self.client_settings
|
||
|
||
def _payment_type_by_setting(self, setting_name: str) -> data.PaymentType | None:
|
||
wanted = self._setup_text(setting_name)
|
||
if not wanted:
|
||
return None
|
||
wanted_l = wanted.casefold()
|
||
payments = list(getattr(self.setup, "platby", []) or [])
|
||
payments.extend(
|
||
p for p in (getattr(self, "_payments", []) or [])
|
||
if hasattr(p, "code")
|
||
)
|
||
seen = set()
|
||
for ptype in payments:
|
||
key = str(getattr(ptype, "code", "") or "").casefold()
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
values = {
|
||
str(getattr(ptype, "code", "") or "").casefold(),
|
||
str(getattr(ptype, "name", "") or "").casefold(),
|
||
}
|
||
if wanted_l in values:
|
||
return ptype
|
||
return None
|
||
|
||
def _is_terminal_payment(self, payment_row) -> bool:
|
||
amount = abs(float(getattr(payment_row, "suma_czk", 0) or getattr(payment_row, "suma", 0) or 0))
|
||
return bool(getattr(payment_row, "is_bankterm", False)) and amount >= 0.005
|
||
|
||
def _ucet_has_terminal_payment(self, ucet: data.Ucet) -> bool:
|
||
return any(self._is_terminal_payment(pay) for pay in (getattr(ucet, "platby", []) or []))
|
||
|
||
def _terminal_storno_mode(self) -> int:
|
||
mode = self._setup_int("terminal_storno", 0)
|
||
return mode if mode in (0, 1, 2, 3) else 0
|
||
|
||
def _terminal_storno_replacement_type(self, mode: int) -> data.PaymentType | None:
|
||
if mode == 0:
|
||
return self._payment_type_by_setting("bar_pay_in_cash")
|
||
if mode == 3:
|
||
return self._payment_type_by_setting("card_pay_no_term")
|
||
return None
|
||
|
||
def _storno_payment_from_type(
|
||
self,
|
||
ptype: data.PaymentType,
|
||
source_pay: data.Platba,
|
||
amount_czk: float,
|
||
tip: float = 0.0,
|
||
force_no_terminal: bool = False,
|
||
) -> data.Platba:
|
||
rate = float(getattr(ptype, "rate", 1.0) or 1.0)
|
||
amount = round(abs(float(amount_czk or 0)) / rate, 2) if rate else abs(float(amount_czk or 0))
|
||
note = str(getattr(source_pay, "poznamka", "") or "").strip()
|
||
source_name = str(getattr(source_pay, "nazev", "") or getattr(source_pay, "code", "") or "").strip()
|
||
if source_name:
|
||
note = f"{note}; " if note else ""
|
||
note += f"Storno povodnej platby: {source_name}"
|
||
return data.Platba(
|
||
code=str(getattr(ptype, "code", "") or ""),
|
||
nazev=str(getattr(ptype, "name", "") or getattr(ptype, "code", "") or ""),
|
||
suma=-abs(amount),
|
||
suma_czk=-abs(round(float(amount_czk or 0), 2)),
|
||
unit=str(getattr(ptype, "unit", "") or getattr(source_pay, "unit", "") or ""),
|
||
rate=rate,
|
||
fiscal=bool(getattr(ptype, "fiscal", False)),
|
||
is_bankterm=False if force_no_terminal else bool(getattr(ptype, "is_bankterm", False)),
|
||
p_kopii=max(int(getattr(ptype, "p_kopii", getattr(source_pay, "p_kopii", 1)) or 0), 0),
|
||
tip=-abs(round(float(tip or 0), 2)),
|
||
poznamka=note or None,
|
||
hotel_charge=getattr(source_pay, "hotel_charge", None),
|
||
)
|
||
|
||
def _bankterm_for_ucet(self, ucet: data.Ucet):
|
||
printer_no = str(getattr(ucet, "bill_printer", "") or self.default_printer or "").strip()
|
||
if not printer_no:
|
||
return None
|
||
printer = next(
|
||
(
|
||
prn for prn in (getattr(self, "_printers", []) or [])
|
||
if str(getattr(prn, "prn_no", "") or "") == printer_no
|
||
),
|
||
None,
|
||
)
|
||
id_term = str(getattr(printer, "id_term", "") or "").strip() if printer else ""
|
||
if not id_term:
|
||
return None
|
||
return next(
|
||
(
|
||
term for term in (getattr(self, "_bankterms", []) or [])
|
||
if str(getattr(term, "id_term", "") or "") == id_term
|
||
),
|
||
None,
|
||
)
|
||
|
||
def _bankterm_url(self, term_data) -> str:
|
||
if not term_data:
|
||
return ""
|
||
reqadr = str(getattr(term_data, "eft_reqadr", "") or "").strip()
|
||
if reqadr.startswith(("http://", "https://")):
|
||
return reqadr
|
||
protocol = str(getattr(term_data, "protokol", "") or "http").strip().lower()
|
||
if protocol not in {"http", "https"}:
|
||
protocol = "http"
|
||
host = str(getattr(term_data, "eft_ipadr", "") or "").strip()
|
||
port = str(getattr(term_data, "eft_rempor", "") or getattr(term_data, "eft_lclpor", "") or "").strip()
|
||
if not host:
|
||
return ""
|
||
url = f"{protocol}://{host}"
|
||
if port:
|
||
url = f"{url}:{port}"
|
||
if reqadr:
|
||
url = f"{url}/{reqadr.lstrip('/')}"
|
||
return url
|
||
|
||
def _refund_direct_terminal_if_needed(
|
||
self,
|
||
source_ucet: data.Ucet,
|
||
source_pay: data.Platba,
|
||
storno_pay: data.Platba,
|
||
) -> None:
|
||
fiscal_result = getattr(source_ucet, "fiscal_result", {}) or {}
|
||
if isinstance(fiscal_result, dict) and fiscal_result.get("transaction_result"):
|
||
return
|
||
term = self._bankterm_for_ucet(source_ucet)
|
||
term_data = getattr(term, "term_data", None) if term else None
|
||
if str(getattr(term_data, "typ", "") or "").strip().upper() != "BESTERON":
|
||
return
|
||
terminal_result = getattr(source_pay, "terminal_result", {}) or {}
|
||
service_id = str(terminal_result.get("service_id") or "").strip() if isinstance(terminal_result, dict) else ""
|
||
if not service_id:
|
||
raise RuntimeError("Povodna terminalova platba nema ulozeny service_id pre storno cez Besteron.")
|
||
config = bankterm_service.BankTerminalConfig(
|
||
terminal_type=str(getattr(term_data, "typ", "") or "BESTERON"),
|
||
url=self._bankterm_url(term_data),
|
||
terminal_id=str(getattr(term_data, "terminal_id", "") or ""),
|
||
sale_id=str(getattr(term_data, "sale_id", "") or "Alto/foodw32"),
|
||
user=str(getattr(term_data, "terminal_user", "") or ""),
|
||
password=str(getattr(term_data, "terminal_password", "") or ""),
|
||
currency=self._currency(),
|
||
)
|
||
result = bankterm_service.create_bank_terminal_client(config).refund(
|
||
service_id,
|
||
abs(float(getattr(storno_pay, "suma_czk", 0) or 0)),
|
||
)
|
||
storno_pay.terminal_result = {
|
||
"storno": True,
|
||
"original_service_id": service_id,
|
||
**result.legacy_dict(),
|
||
}
|
||
if not result.success:
|
||
raise RuntimeError(result.error or "Storno platby cez terminal bolo zamietnute.")
|
||
|
||
def _make_storno_payment(
|
||
self,
|
||
source_ucet: data.Ucet,
|
||
source_pay: data.Platba,
|
||
amount_czk: float | None = None,
|
||
tip: float | None = None,
|
||
origin: str = "Storno",
|
||
) -> data.Platba:
|
||
source_amount_czk = abs(float(getattr(source_pay, "suma_czk", 0) or getattr(source_pay, "suma", 0) or 0))
|
||
target_amount_czk = source_amount_czk if amount_czk is None else abs(float(amount_czk or 0))
|
||
if target_amount_czk <= 0:
|
||
target_amount_czk = 0.0
|
||
source_tip = abs(float(getattr(source_pay, "tip", 0) or 0))
|
||
target_tip = source_tip if tip is None else abs(float(tip or 0))
|
||
mode = self._terminal_storno_mode()
|
||
if self._is_terminal_payment(source_pay):
|
||
if mode == 2 and origin != "AutoStornoFiscal":
|
||
raise RuntimeError("Storno uctu s platbou cez terminal je zakazane parametrom terminal_storno.")
|
||
replacement = self._terminal_storno_replacement_type(mode)
|
||
if replacement is not None:
|
||
return self._storno_payment_from_type(
|
||
replacement,
|
||
source_pay,
|
||
target_amount_czk,
|
||
tip=target_tip,
|
||
force_no_terminal=True,
|
||
)
|
||
if mode in (0, 3):
|
||
setting_name = "bar_pay_in_cash" if mode == 0 else "card_pay_no_term"
|
||
raise RuntimeError(f"Pre terminal_storno={mode} nie je nastavena platba v parametri {setting_name}.")
|
||
new_pay = source_pay.model_copy(deep=True)
|
||
rate = float(getattr(new_pay, "rate", 1.0) or 1.0)
|
||
new_pay.suma_czk = -abs(round(target_amount_czk, 2))
|
||
new_pay.suma = -abs(round(target_amount_czk / rate, 2) if rate else target_amount_czk)
|
||
new_pay.tip = -abs(round(target_tip, 2))
|
||
new_pay.terminal_result = {}
|
||
if self._is_terminal_payment(source_pay) and mode == 1:
|
||
self._refund_direct_terminal_if_needed(source_ucet, source_pay, new_pay)
|
||
return new_pay
|
||
|
||
def _build_storno_payments(
|
||
self,
|
||
source_ucet: data.Ucet,
|
||
allocations: dict[int, tuple[float, float]] | None = None,
|
||
origin: str = "Storno",
|
||
) -> list[data.Platba]:
|
||
payments = []
|
||
for idx, pay in enumerate(getattr(source_ucet, "platby", []) or []):
|
||
amount_czk = None
|
||
tip = None
|
||
if allocations and idx in allocations:
|
||
amount_czk, tip = allocations[idx]
|
||
payments.append(
|
||
self._make_storno_payment(
|
||
source_ucet,
|
||
pay,
|
||
amount_czk=amount_czk,
|
||
tip=tip,
|
||
origin=origin,
|
||
)
|
||
)
|
||
return payments
|
||
|
||
def _validate_terminal_storno_settings(self, ucet: data.Ucet, origin: str = "Storno") -> None:
|
||
if not self._ucet_has_terminal_payment(ucet):
|
||
return
|
||
mode = self._terminal_storno_mode()
|
||
if mode == 2 and origin != "AutoStornoFiscal":
|
||
raise RuntimeError("Storno uctu s platbou cez terminal je zakazane parametrom terminal_storno.")
|
||
if mode in (0, 3) and self._terminal_storno_replacement_type(mode) is None:
|
||
setting_name = "bar_pay_in_cash" if mode == 0 else "card_pay_no_term"
|
||
raise RuntimeError(f"Pre terminal_storno={mode} nie je nastavena platba v parametri {setting_name}.")
|
||
|
||
def save_mapa_stolu(self):
|
||
from api_call import save_mapa_stolu_API
|
||
ok, resp = save_mapa_stolu_API(self.ctx, self.mapa_stolu)
|
||
return ok, resp
|
||
# ------------------------------------------------------------------------
|
||
def get_table_map_provider(self):
|
||
if not hasattr(self, "_table_map_provider"):
|
||
from mapa_stolu import RoomMapProvider
|
||
self._table_map_provider = RoomMapProvider(self)
|
||
return self._table_map_provider
|
||
# ------------------------------------------------------------------------
|
||
def limits_room_enabled(self) -> bool:
|
||
return self._setup_bool("postgres_enabled", False) and self._setup_bool("is_limspra", False)
|
||
# ------------------------------------------------------------------------
|
||
def cached_limit_tables(self):
|
||
return list(getattr(self, "_limit_tables_cache", []) or [])
|
||
# ------------------------------------------------------------------------
|
||
def load_limit_tables(self, *, force: bool = False):
|
||
if not self.limits_room_enabled():
|
||
self._limit_tables_cache = []
|
||
return []
|
||
if not force and self._limit_tables_cache:
|
||
return list(self._limit_tables_cache)
|
||
try:
|
||
self._limit_tables_cache = api_call.load_limity_API(self.ctx)
|
||
return list(self._limit_tables_cache)
|
||
except Exception as e:
|
||
Logger.warning(f"Limity se nepodařilo načíst: {e}")
|
||
return list(self._limit_tables_cache)
|
||
# ------------------------------------------------------------------------
|
||
def allow_account_refresh(self) -> bool:
|
||
return (
|
||
self._allow_account_refresh
|
||
and self._editing_stul is None
|
||
and self._split_u_main is None
|
||
)
|
||
# ------------------------------------------------------------------------
|
||
def fox_hash(retizek: str, dlzka: int | None = None, slovenskaverzia: bool = True) -> str:
|
||
import hashlib
|
||
# nastavenie dĺžky
|
||
if slovenskaverzia:
|
||
if not dlzka:
|
||
dlzka = 20
|
||
else:
|
||
dlzka = 12
|
||
|
||
# SHA1 hash (raw bytes, nie hex)
|
||
sha1_bytes = hashlib.sha1(retizek).digest()
|
||
|
||
# vlastné "hex" kódovanie
|
||
SEMINKO = "ABCDEFGHIJKLMNOP"
|
||
result = []
|
||
for b in sha1_bytes:
|
||
result.append(SEMINKO[b // 16])
|
||
result.append(SEMINKO[b % 16])
|
||
encoded = "".join(result)
|
||
|
||
# skrátenie na požadovanú dĺžku
|
||
return encoded[:dlzka]
|
||
|
||
def login_user(self, pin: str):
|
||
# Milan 10.04.26 - doplnena kontrola hashovaneho hesla
|
||
Logger.info("CTRL: login_user")
|
||
try:
|
||
user = api_call.login_user_API(self.ctx, pin) # žádný try
|
||
except:
|
||
import hashlib
|
||
# nastavenie dĺžky
|
||
dlzka = 20
|
||
# SHA1 hash (raw bytes, nie hex)
|
||
sha1_bytes = hashlib.sha1(pin.ljust(dlzka, " ").encode("utf-8")).digest()
|
||
# vlastné "hex" kódovanie
|
||
SEMINKO = "ABCDEFGHIJKLMNOP"
|
||
result = []
|
||
for b in sha1_bytes:
|
||
result.append(SEMINKO[b // 16])
|
||
result.append(SEMINKO[b % 16])
|
||
encoded = "".join(result)
|
||
# skrátenie na požadovanú dĺžku
|
||
pincoded = encoded[:dlzka]
|
||
try:
|
||
user = api_call.login_user_API(self.ctx, pincoded) # žádný try
|
||
except:
|
||
# nastavenie dĺžky
|
||
dlzka = 12
|
||
# SHA1 hash (raw bytes, nie hex)
|
||
sha1_bytes = hashlib.sha1(pin.ljust(dlzka, " ").encode("utf-8")).digest()
|
||
# vlastné "hex" kódovanie
|
||
SEMINKO = "ABCDEFGHIJKLMNOP"
|
||
result = []
|
||
for b in sha1_bytes:
|
||
result.append(SEMINKO[b // 16])
|
||
result.append(SEMINKO[b % 16])
|
||
encoded = "".join(result)
|
||
# skrátenie na požadovanú dĺžku
|
||
pincoded = encoded[:dlzka]
|
||
user = api_call.login_user_API(self.ctx, pincoded) # žádný try
|
||
|
||
self.user_login = user
|
||
self.translator.set_lang(getattr(user, "jazyk", "sk"))
|
||
self._load_user_cenik_texts()
|
||
self._build_pos_static_maps()
|
||
self._permits=user.permits
|
||
#Milan 15.04.26 - doplneny zoznam povolenych platieb a zliav
|
||
if user.is_admin:
|
||
self._payments = [
|
||
pl for pl in self.setup.platby
|
||
]
|
||
else:
|
||
self._payments=user.payments
|
||
if user.is_admin:
|
||
self._discounts = [
|
||
pl for pl in self.zlavy
|
||
]
|
||
else:
|
||
self._discounts=user.discounts
|
||
if user.is_admin:
|
||
self._levels = [
|
||
pl for pl in self._price_levels
|
||
]
|
||
else:
|
||
self._levels = [
|
||
pl for pl in self._price_levels
|
||
if pl.ch in user.levels
|
||
]
|
||
self._load_client_defaults()
|
||
|
||
Logger.info(
|
||
f"CTRL: user logged in: {self.ctx.user}, "
|
||
f"permits={self._permits}"
|
||
)
|
||
return user
|
||
|
||
def _load_user_cenik_texts(self):
|
||
lang = self._current_language()
|
||
if lang == "sk":
|
||
self.cenik_texts = {}
|
||
self.cenik_ui = self.cenik
|
||
return
|
||
try:
|
||
texty = api_call.load_cenik_texty_API(self.ctx, lang)
|
||
self.cenik_texts = {
|
||
int(item.id_card): item
|
||
for item in (texty or [])
|
||
if int(getattr(item, "id_card", 0) or 0)
|
||
}
|
||
except Exception as e:
|
||
Logger.warning(f"Cenik texty sa nepodarilo nacitat pre jazyk {lang}: {e}")
|
||
self.cenik_texts = {}
|
||
self.cenik_ui = self._localized_cenik()
|
||
|
||
def _localized_cenik(self) -> data.Cenik | None:
|
||
if not self.cenik:
|
||
return None
|
||
if not self.cenik_texts:
|
||
return self.cenik
|
||
items = []
|
||
for item in self.cenik.cenpol:
|
||
clone = item.model_copy(deep=True)
|
||
text = self.cenik_texts.get(int(getattr(clone, "id_card", 0) or 0))
|
||
if text:
|
||
if text.d_name:
|
||
clone.d_name = text.d_name
|
||
items.append(clone)
|
||
return data.Cenik(cenpol=items)
|
||
|
||
# ------------------------------------------------------------------------
|
||
def dispatch_key(self, key):
|
||
if not self.modal_manager or not self.modal_manager.active_modal:
|
||
return False
|
||
if key == "ESC":
|
||
return self.modal_manager.close_top()
|
||
modal = self.modal_manager.active_modal
|
||
if hasattr(modal, "handle_key"):
|
||
return modal.handle_key(key)
|
||
return False
|
||
# ------------------------------------------------------------------------
|
||
def logout_user(self):
|
||
Logger.info("CTRL: logout_user")
|
||
self.user_login = None
|
||
self.ctx.user = ""
|
||
# návrat na login screen
|
||
self.app.show_login()
|
||
# ------------------------------------------------------------------------
|
||
def has_perm(self, perm: str) -> bool:
|
||
if not self.user_login:
|
||
return False
|
||
return perm in self._permits
|
||
# ------------------------------------------------------------------------
|
||
"""
|
||
def start_app(self) -> bool:
|
||
try:
|
||
Logger.debug("Login zakázky")
|
||
self.version_API, self.database_name = api_call.login_API(self.ctx)
|
||
Logger.debug("Načítám setup pokladny")
|
||
self.setup = api_call.load_setup_API(self.ctx)
|
||
Logger.debug("Načítám ceník")
|
||
self.cenik = api_call.load_cenik_API(self.ctx)
|
||
self.cenik_ui = self.cenik
|
||
Logger.debug("Načítám FST menu")
|
||
self.fstmenu = api_call.load_fstmenu_API(self.ctx)
|
||
Logger.debug("Načítám mapu stolu")
|
||
try:
|
||
self.mapa_stolu = api_call.load_mapa_stolu_API(self.ctx)
|
||
except Exception as e:
|
||
Logger.error(f"MAPA STOLU se nepodařilo načíst: {e}")
|
||
from kivy.clock import Clock
|
||
Clock.schedule_once(lambda *_: self._popup_info(
|
||
f"MAPA STOLU se nepodařilo načíst:\n{e}"
|
||
))
|
||
self.mapa_stolu = data.MapaStolu(
|
||
rooms=[],
|
||
pokladny=[self.ctx.id_kas]
|
||
)
|
||
api_call.start_heartbeat(ctx=self.ctx)
|
||
self.ready = True
|
||
Logger.info(f"Verze API {self.version_API}, verze frontend {self.version_frontend}")
|
||
Logger.info("Aplikace připravena")
|
||
return True
|
||
except Exception as e:
|
||
Logger.error(f"Start aplikace SELHAL {str(e)}")
|
||
api_call.stop_heartbeat(ctx=self.ctx)
|
||
self.ready = False
|
||
self.last_error = str(e)
|
||
return False
|
||
"""
|
||
|
||
def start_app(self) -> bool:
|
||
try:
|
||
Logger.debug("Login zakázky")
|
||
self.version_API, self.database_name = api_call.login_API(self.ctx)
|
||
Logger.debug("Načítám setup pokladny")
|
||
self.setup = api_call.load_setup_API(self.ctx)
|
||
self.mamechody = getattr(self.setup, "is_chod", True) if self.setup else True
|
||
self.mamehosti = getattr(self.setup, "is_host", True) if self.setup else True
|
||
self.mametretiny = getattr(self.setup, "is_tretiny", True) if self.setup else True
|
||
self.mamestvrtiny = getattr(self.setup, "is_stvrtiny", True) if self.setup else True
|
||
Logger.debug("Načítám ceník")
|
||
self.cenik = api_call.load_cenik_API(self.ctx)
|
||
Logger.debug("Načítám zľavy")
|
||
try:
|
||
self.zlavy = api_call.load_zlavy_API(self.ctx)
|
||
except Exception as e:
|
||
Logger.warning(f"Zľavy se nepodařilo načíst: {e}")
|
||
self.zlavy = data.Zlavy(id_kas=self.ctx.id_kas, zlavy=[])
|
||
Logger.debug("Načítám hlavicky uctov")
|
||
try:
|
||
self.kasutxt = api_call.load_kasutxt_API(self.ctx)
|
||
Logger.warning(f"Hlavicky uctov sa podařilo načíst: {self.kasutxt}")
|
||
except Exception as e:
|
||
Logger.warning(f"Hlavicky uctov se nepodařilo načíst: {e}")
|
||
self.kasutxt = data.KasUtxtRiadky(id_kas=self.ctx.id_kas, kasutxt=[])
|
||
|
||
Logger.debug("Načítám FST menu")
|
||
self.fstmenu = api_call.load_fstmenu_API(self.ctx)
|
||
Logger.debug("Načítám mapu stolu")
|
||
self._printers = api_call.load_printers_for_kasa_API(self.ctx)
|
||
Logger.debug("Načítám tlačiarne")
|
||
try:
|
||
self._bankterms = api_call.load_bankterms_API(self.ctx)
|
||
except Exception as e:
|
||
Logger.warning(f"Bankove terminaly se nepodařilo načíst: {e}")
|
||
self._bankterms = []
|
||
Logger.debug("Načítám bankové terminály")
|
||
self._price_levels = api_call.load_pricelevels_API(self.ctx)
|
||
Logger.debug("Načítám cenové hladiny")
|
||
self._build_pos_static_maps()
|
||
self._load_client_defaults()
|
||
try:
|
||
self.mapa_stolu = api_call.load_mapa_stolu_API(self.ctx)
|
||
except Exception as e:
|
||
Logger.error(f"MAPA STOLU se nepodařilo načíst: {e}")
|
||
Clock.schedule_once(lambda *_: self._popup_info(
|
||
f"MAPA STOLU se nepodařilo načíst:\n{e}"
|
||
))
|
||
self.mapa_stolu = data.MapaStolu(
|
||
rooms=[],
|
||
pokladny=[self.ctx.id_kas]
|
||
)
|
||
# ===== START OK =====
|
||
api_call.start_heartbeat(ctx=self.ctx)
|
||
self.ready = True
|
||
self.last_error = ""
|
||
Logger.info(f"Verze API {self.version_API}, verze frontend {self.version_frontend}")
|
||
Logger.info("Aplikace připravena")
|
||
# UI REFRESH (MUSÍ BÝT PŘED RETURN)
|
||
#Clock.schedule_once(lambda *_: self._on_app_ready(), 0)
|
||
return True
|
||
except Exception as e:
|
||
Logger.error(f"Start aplikace SELHAL {str(e)}")
|
||
api_call.stop_heartbeat(ctx=self.ctx)
|
||
self.ready = False
|
||
self.last_error = str(e)
|
||
# otevři widget na UI threadu
|
||
#Clock.schedule_once(lambda *_: self._ask_for_api_address())
|
||
return False
|
||
|
||
def _iter_cenik_items(self):
|
||
cenik = self.cenik_ui or self.cenik
|
||
if not cenik:
|
||
return []
|
||
return getattr(cenik, "cenpol", []) or []
|
||
|
||
def _normalize_search_text(self, text) -> str:
|
||
text = str(text or "").strip().lower()
|
||
text = unicodedata.normalize("NFD", text)
|
||
return "".join(
|
||
c for c in text
|
||
if unicodedata.category(c) != "Mn"
|
||
)
|
||
|
||
def _build_pos_search_index(self, cenik_map):
|
||
search_index = []
|
||
for item in cenik_map.values():
|
||
if getattr(item, "id_card", 0) <= 0:
|
||
continue
|
||
|
||
kod_val = getattr(item, "kod", None)
|
||
eans = []
|
||
for e in getattr(item, "eany", []) or []:
|
||
ean_val = getattr(e, "ean", None) if not isinstance(e, dict) else e.get("ean")
|
||
if ean_val:
|
||
eans.append(self._normalize_search_text(ean_val))
|
||
|
||
search_index.append({
|
||
"item": item,
|
||
"name": self._normalize_search_text(getattr(item, "d_name", "")),
|
||
"kod": "" if kod_val is None else self._normalize_search_text(kod_val),
|
||
"eans": eans,
|
||
})
|
||
return search_index
|
||
|
||
def _build_pos_code_index(self, cenik_map):
|
||
code_index = {}
|
||
for item in cenik_map.values():
|
||
kod = getattr(item, "kod", None)
|
||
if kod:
|
||
code_index[str(kod)] = (item, 1)
|
||
|
||
for e in getattr(item, "eany", []) or []:
|
||
ean = getattr(e, "ean", None) if not isinstance(e, dict) else e.get("ean")
|
||
if ean:
|
||
coef = getattr(e, "koeficient", None) if not isinstance(e, dict) else e.get("koeficient")
|
||
code_index[str(ean)] = (item, coef or 1)
|
||
return code_index
|
||
|
||
def _build_pos_menu_pages(self):
|
||
pages = {
|
||
pos.page
|
||
for cp in self._iter_cenik_items()
|
||
for pos in (getattr(cp, "positions", []) or [])
|
||
if getattr(pos, "page", None) is not None
|
||
}
|
||
return sorted(pages)
|
||
|
||
def _build_pos_static_maps(self):
|
||
cenik_map = {
|
||
cp.id_card: cp
|
||
for cp in self._iter_cenik_items()
|
||
}
|
||
fstmenu_map = {
|
||
cp.c_karty: cp
|
||
for cp in (self.fstmenu or [])
|
||
}
|
||
price_level_map = {
|
||
pl.ch: pl.ch_name
|
||
for pl in (self._price_levels or [])
|
||
}
|
||
printer_map = {
|
||
p.prn_no: p.prn_name
|
||
for p in (self._printers or [])
|
||
}
|
||
self.pos_static_maps = {
|
||
"cenik_map": cenik_map,
|
||
"fstmenu_map": fstmenu_map,
|
||
"price_level_map": price_level_map,
|
||
"printer_map": printer_map,
|
||
"search_index": self._build_pos_search_index(cenik_map),
|
||
"code_index": self._build_pos_code_index(cenik_map),
|
||
"menu_pages": self._build_pos_menu_pages(),
|
||
}
|
||
Logger.info(
|
||
"CTRL: POS static maps built "
|
||
f"items={len(cenik_map)}, "
|
||
f"codes={len(self.pos_static_maps['code_index'])}, "
|
||
f"search={len(self.pos_static_maps['search_index'])}"
|
||
)
|
||
|
||
def _on_app_ready(self):
|
||
Logger.info("UI refresh po startu")
|
||
try:
|
||
app = App.get_running_app()
|
||
for child in list(Window.children):
|
||
if child not in [app.root]:
|
||
Logger.warning(f"Removing orphan widget: {child}")
|
||
Window.remove_widget(child)
|
||
Clock.schedule_once(lambda *_: app._show_login(), 0)
|
||
Logger.info("UI ready → čekám na login")
|
||
except Exception as e:
|
||
Logger.error(f"UI refresh chyba: {e}")
|
||
|
||
def _ask_for_api_address(self):
|
||
modal = ModalView(
|
||
size_hint=(None, None),
|
||
size=(dp(400), dp(220)),
|
||
auto_dismiss=False
|
||
)
|
||
root = BoxLayout(orientation="vertical", padding=dp(10), spacing=dp(10))
|
||
# ===== OK =====
|
||
def on_done(val):
|
||
ip, port = val
|
||
new_url = f"http://{ip}:{port}"
|
||
self.ctx.base_url = new_url
|
||
try:
|
||
cfg = ConfigManager()
|
||
cfg.set("base_url", new_url)
|
||
except Exception as e:
|
||
Logger.warning(f"Config save failed: {e}")
|
||
modal.dismiss()
|
||
# STOP starý stav
|
||
self.ready = False
|
||
api_call.stop_heartbeat(ctx=self.ctx)
|
||
# START V THREADU
|
||
threading.Thread(target=self._start_app_thread, daemon=True).start()
|
||
# ===== CANCEL =====
|
||
def on_cancel():
|
||
Logger.info("Uživatel zrušil → ukončuji app")
|
||
modal.dismiss()
|
||
App.get_running_app().stop()
|
||
cfg = ConfigManager()
|
||
widget = IpPortInput(
|
||
mode="both",
|
||
ip=cfg.base_url.split("//")[1].split(":")[0],
|
||
port=int(cfg.base_url.split(":")[-1]),
|
||
on_done=on_done,
|
||
)
|
||
btn_cancel = Button(
|
||
text="ZRUŠIT",
|
||
size_hint_y=None,
|
||
height=dp(40),
|
||
background_color=(0.7, 0.2, 0.2, 1)
|
||
)
|
||
btn_cancel.bind(on_press=lambda *_: on_cancel())
|
||
root.add_widget(widget)
|
||
root.add_widget(btn_cancel)
|
||
modal.add_widget(root)
|
||
modal.open()
|
||
|
||
def _start_app_thread(self):
|
||
from kivy.clock import Clock
|
||
Logger.info("THREAD: start_app běží")
|
||
ok = self.start_app()
|
||
Logger.info(f"THREAD RESULT = {ok}")
|
||
if ok:
|
||
Clock.schedule_once(lambda *_: self._on_app_ready(), 0)
|
||
else:
|
||
Clock.schedule_once(lambda *_: self._ask_for_api_address(), 0)
|
||
|
||
# ------------------------------------------------------------------------
|
||
def _bar_table_id(self) -> str:
|
||
client_id = str(getattr(self.ctx, "client_id", "") or "").strip() or "00"
|
||
return f"BAR-{client_id}"
|
||
|
||
def open_bar(self, *_):
|
||
stul = self._bar_table_id()
|
||
Logger.info(f"Opening bar table {stul}")
|
||
self.open_table(stul, bar_mode=True)
|
||
|
||
def perform_fiscal_cash_operation(self, operation, amount, payment, printer):
|
||
title = "Vklad" if str(operation or "") == "manual_deposit" else "Výber"
|
||
printer_no = str(getattr(printer, "prn_no", "") or "").strip()
|
||
try:
|
||
result = api_call.print_fiscal_cash_operation_API(
|
||
self.ctx,
|
||
operation=operation,
|
||
amount=float(amount or 0),
|
||
payment=payment,
|
||
printer_no=printer_no,
|
||
author=getattr(self.user_login, "name", "") if self.user_login else "",
|
||
pos_name=self._receipt_pos_name(),
|
||
)
|
||
except Exception as e:
|
||
Logger.exception("Fiscal cash operation failed")
|
||
self._popup_info(title, f"{title} sa nepodarilo vykonať.\n{e}")
|
||
return False
|
||
ucet = getattr(result, "ucet", None)
|
||
ucislo = getattr(ucet, "ucislo", "") if ucet else ""
|
||
self._popup_info(title, f"{title} bol vykonaný.\nDoklad: {ucislo or '-'}")
|
||
try:
|
||
if getattr(self.app, "account_screen", None):
|
||
Clock.schedule_once(lambda *_: self.app.account_screen.refresh(), 0)
|
||
except Exception:
|
||
Logger.debug("Account screen refresh after cash operation skipped")
|
||
return True
|
||
|
||
def _is_limit_table_id(self, stul: str) -> bool:
|
||
return str(stul or "").strip().upper().startswith("LIM:")
|
||
|
||
def _parse_limit_table_id(self, stul: str) -> tuple[int, int]:
|
||
parts = str(stul or "").strip().split(":")
|
||
if len(parts) != 3:
|
||
raise ValueError("Neplatny identifikator limitu.")
|
||
return int(parts[1]), int(parts[2])
|
||
|
||
def open_limit_table(self, stul: str):
|
||
try:
|
||
id_limit, id_den = self._parse_limit_table_id(stul)
|
||
ucet = api_call.load_limit_ucet_API(self.ctx, id_limit=id_limit, id_den=id_den)
|
||
self._opened_dummy = False
|
||
try:
|
||
self.open_posdialog(ucet, limit_mode=True)
|
||
except Exception:
|
||
try:
|
||
api_call.release_limit_API(self.ctx, id_limit=id_limit)
|
||
except Exception as release_error:
|
||
Logger.warning(f"Limit release after open failure failed: {release_error}")
|
||
raise
|
||
except Exception as e:
|
||
Logger.exception("Limit table open failed")
|
||
_popup_info("Limity", f"Limit sa nepodarilo otvorit.\n{e}")
|
||
|
||
def open_table(self, stul: str, bar_mode: bool = False):
|
||
if self._is_limit_table_id(stul):
|
||
self.open_limit_table(stul)
|
||
return
|
||
Logger.info(f"Opening table {stul} bar_mode={bar_mode}")
|
||
try:
|
||
# pokus o normální otevření
|
||
ucet = self.load_ucet_z_api(stul, block=True)
|
||
self._opened_dummy = False
|
||
try:
|
||
self.open_posdialog(ucet, bar_mode=bar_mode)
|
||
except Exception:
|
||
Logger.exception("POSDialog crash")
|
||
raise
|
||
return
|
||
self.open_posdialog(ucet, bar_mode=bar_mode)
|
||
return
|
||
except RuntimeError as e:
|
||
err = e.args[0]
|
||
if isinstance(err, dict) and "detail" in err:
|
||
detail = err["detail"]
|
||
# blokovaný účet
|
||
if "blokován" in detail:
|
||
blocked = detail.split("blokován:", 1)[1].strip()
|
||
client_id, *_ = blocked.split("|")
|
||
_popup_info(
|
||
"Stůl obsazen",
|
||
f"Stůl {stul} je otevřen na terminálu {client_id}"
|
||
)
|
||
return
|
||
# účet neexistuje → open + znovu load
|
||
if "nenalezen" in detail:
|
||
Logger.info(f"Opening dummy account for table {stul}")
|
||
api_call.open_block_ucet_API(self.ctx, stul)
|
||
# KLÍČOVÉ: vždy načíst plný objekt
|
||
ucet = self.load_ucet_z_api(stul, block=True)
|
||
self._opened_dummy = True
|
||
self.open_posdialog(ucet, bar_mode=bar_mode)
|
||
return
|
||
# jiná doménová chyba
|
||
_popup_info("Chyba", detail)
|
||
return
|
||
# fallback
|
||
Logger.exception(e)
|
||
_popup_info( "Chyba", str(e))
|
||
|
||
def open_closure_select_dialog(self, closures, title, on_select):
|
||
dlg = clsrep_select.ClosureSelectDialog(
|
||
closures=closures,
|
||
title=title,
|
||
on_select=on_select,
|
||
)
|
||
dlg.open()
|
||
|
||
def _load_clsrep_printers(self) -> list[data.PrnDefShort]:
|
||
try:
|
||
printers = api_call.load_all_printers_API(self.ctx)
|
||
if printers:
|
||
return printers
|
||
except Exception:
|
||
Logger.exception("CTRL: all prndef load for closure failed")
|
||
return list(self._printers or [])
|
||
|
||
def _clsrep_no_from_report(self, clsrep: dict | None) -> str:
|
||
if not isinstance(clsrep, dict):
|
||
return ""
|
||
return str(clsrep.get("clsrep_no") or "").strip()
|
||
|
||
def _enqueue_clsrep_print_jobs(
|
||
self,
|
||
clsrep: dict,
|
||
printer_no: str,
|
||
text: str,
|
||
kind: str = "closure",
|
||
title: str = "Uzavierka",
|
||
):
|
||
try:
|
||
jobs = api_call.create_closure_print_jobs_API(
|
||
self.ctx,
|
||
text=text,
|
||
printer_no=printer_no,
|
||
clsrep_no=self._clsrep_no_from_report(clsrep),
|
||
kind=kind,
|
||
title=title,
|
||
required=False,
|
||
priority=35,
|
||
copies=1,
|
||
)
|
||
Logger.info(
|
||
"CTRL: closure print jobs created kind=%s clsrep=%s printer=%s count=%s",
|
||
kind,
|
||
self._clsrep_no_from_report(clsrep) or "-",
|
||
printer_no,
|
||
len(jobs),
|
||
)
|
||
self._popup_info("Tlač uzávierky", f"Uzávierka bola zaradená do tlačovej fronty ({len(jobs)} job).")
|
||
return jobs
|
||
except Exception as e:
|
||
Logger.exception("CTRL: closure print job creation failed")
|
||
self._popup_info(
|
||
"Tlač uzávierky",
|
||
"Uzávierku sa nepodarilo zaradiť do tlačovej fronty.\n"
|
||
f"{e}",
|
||
)
|
||
return []
|
||
|
||
def _show_clsrep_preview(
|
||
self,
|
||
clsrep: dict,
|
||
*,
|
||
kind: str = "closure",
|
||
title: str = "Nahlad uzavierky",
|
||
extra_actions=None,
|
||
):
|
||
kivy_printer.show_clsrep_preview(
|
||
clsrep,
|
||
printers=self._load_clsrep_printers(),
|
||
default_printer=self.default_printer or "",
|
||
title=title,
|
||
extra_actions=extra_actions,
|
||
on_print=lambda printer_no, text: self._enqueue_clsrep_print_jobs(
|
||
clsrep,
|
||
printer_no,
|
||
text,
|
||
kind=kind,
|
||
title=title,
|
||
),
|
||
)
|
||
|
||
def _closure_report_dict(self, report) -> dict:
|
||
if hasattr(report, "model_dump"):
|
||
return report.model_dump()
|
||
return dict(report or {})
|
||
|
||
def _closure_carry_rows(self, report) -> list[dict]:
|
||
clsrep = self._closure_report_dict(report)
|
||
rows = []
|
||
for row in clsrep.get("cash_state") or []:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
try:
|
||
odvod = int(row.get("payment_odvod") or 0)
|
||
except Exception:
|
||
odvod = 0
|
||
if odvod == 1:
|
||
rows.append(row)
|
||
return rows
|
||
|
||
def _closure_needs_carry_dialog(self, report) -> bool:
|
||
clsrep = self._closure_report_dict(report)
|
||
settings = clsrep.get("closure_settings") or {}
|
||
is_fiskal = bool(settings.get("is_fiskal"))
|
||
return is_fiskal and str(settings.get("uzav_odvod") or "").strip() == "2" and bool(self._closure_carry_rows(report))
|
||
|
||
def _finish_closure_save(self, ucislo: str, cash_carry: list[dict] | None = None):
|
||
report, err = api_call.clsrep_API(
|
||
self.ctx,
|
||
None,
|
||
ucislo,
|
||
save=True,
|
||
cash_carry=cash_carry,
|
||
)
|
||
if err:
|
||
_popup_info("Uzávěrka", err)
|
||
return
|
||
self.close_closed_select()
|
||
try:
|
||
Clock.schedule_once(lambda *_: self.app.account_screen.refresh(), 0)
|
||
except Exception:
|
||
Logger.exception("CTRL: account refresh after closure failed")
|
||
self._show_clsrep_preview(
|
||
report.model_dump(),
|
||
kind="closure",
|
||
title="Uzavierka",
|
||
)
|
||
|
||
def _show_closure_carry_dialog(self, report, ucislo: str):
|
||
printers = self._load_clsrep_printers()
|
||
printer_names = {
|
||
str(getattr(printer, "prn_no", "") or "").strip(): str(getattr(printer, "prn_name", "") or "").strip()
|
||
for printer in printers
|
||
}
|
||
|
||
def confirm(cash_carry: list[dict]):
|
||
self._finish_closure_save(ucislo, cash_carry=cash_carry)
|
||
|
||
dlg = ClosureCarryDialog(
|
||
self.modal_manager,
|
||
self._closure_report_dict(report),
|
||
printer_names,
|
||
on_confirm=confirm,
|
||
)
|
||
self.modal_manager.open(dlg)
|
||
|
||
# ---------------------------------------------------------
|
||
def handle_kopie_uzaverky(self):
|
||
#Načte seznam hotových uzávěrek a otevře výběrový dialog.
|
||
try:
|
||
closures, err = api_call.load_closures_API(self.ctx)
|
||
if err:
|
||
self._popup_info("Kopie uzávěrky", str(err))
|
||
return
|
||
if not closures:
|
||
self._popup_info("Kopie uzávěrky", "Nebyly nalezeny žádné uzávěrky.")
|
||
return
|
||
closures_sorted = sorted(
|
||
closures,
|
||
key=self._closure_sort_key,
|
||
reverse=True,
|
||
)
|
||
dlg = clsrep_select.ClosureSelectDialog(
|
||
closures=closures_sorted,
|
||
title="Vyber uzávěrku pro tisk kopie",
|
||
on_select=self.handle_selected_closure_copy,
|
||
)
|
||
dlg.open()
|
||
except Exception as e:
|
||
self._popup_info("Kopie uzávěrky", f"Chyba při načítání uzávěrek:\n{e}")
|
||
|
||
def handle_selected_closure_copy(self, clsrep_no: str):
|
||
#Po výběru uzávěrky načte detail a otevře náhled/tisk.
|
||
try:
|
||
detail, err = api_call.closure_detail_API(self.ctx, clsrep_no)
|
||
if err:
|
||
self._popup_info("Kopie uzávěrky", f"Nelze načíst detail uzávěrky:\n{err}")
|
||
return
|
||
if not detail:
|
||
self._popup_info("Kopie uzávěrky", "Detail uzávěrky je prázdný.")
|
||
return
|
||
clsrep = detail.get("data")
|
||
if not clsrep:
|
||
self._popup_info("Kopie uzávěrky", "V detailu uzávěrky chybí data pro tisk.")
|
||
return
|
||
extra_actions = []
|
||
clsrep_meta = detail.get("clsrep") or {}
|
||
clsrep_id = clsrep_meta.get("clsrep_id")
|
||
if clsrep_id:
|
||
transfers, transfer_err = api_call.load_closure_transfers_API(
|
||
self.ctx,
|
||
clsrep_id=clsrep_id,
|
||
status="failed",
|
||
)
|
||
if transfer_err:
|
||
Logger.warning("CTRL: closure transfer list failed: %s", transfer_err)
|
||
elif transfers:
|
||
extra_actions.append({
|
||
"text": f"ODOSLAT RECEPCIU ({len(transfers)})",
|
||
"callback": lambda transfers=transfers, clsrep_id=clsrep_id: self._retry_closure_transfers(
|
||
transfers,
|
||
clsrep_id=clsrep_id,
|
||
),
|
||
})
|
||
self._show_clsrep_preview(
|
||
clsrep,
|
||
kind="closure_copy",
|
||
title="Kopia uzavierky",
|
||
extra_actions=extra_actions,
|
||
)
|
||
except Exception as e:
|
||
self._popup_info("Kopie uzávěrky", f"Chyba při otevření kopie uzávěrky:\n{e}")
|
||
|
||
def _retry_closure_transfers(self, transfers: list[dict], clsrep_id=None):
|
||
if not transfers:
|
||
self._popup_info("Prenos uzávierky", "K tejto uzávierke nie je žiadny neúspešný prenos.")
|
||
return
|
||
ok_count = 0
|
||
errors = []
|
||
for row in list(transfers):
|
||
transfer_id = row.get("id")
|
||
if not transfer_id:
|
||
continue
|
||
_, err = api_call.retry_closure_transfer_API(self.ctx, int(transfer_id))
|
||
if err:
|
||
reception = row.get("reception_name") or row.get("reception_id") or transfer_id
|
||
errors.append(f"{reception}: {err}")
|
||
else:
|
||
ok_count += 1
|
||
lines = [f"Odoslané: {ok_count}/{len(transfers)}"]
|
||
if errors:
|
||
lines.append("")
|
||
lines.extend(errors[:5])
|
||
if len(errors) > 5:
|
||
lines.append(f"... ďalších chýb: {len(errors) - 5}")
|
||
self._popup_info("Prenos uzávierky", "\n".join(lines))
|
||
|
||
def _closure_sort_key(self, c):
|
||
#Řazení od nejnovější uzávěrky.
|
||
for attr in ("closed_at_do", "closed_at_od", "clsrep_no"):
|
||
value = getattr(c, attr, None)
|
||
if value:
|
||
dt = self._safe_parse_datetime(value)
|
||
return dt if dt else str(value)
|
||
return ""
|
||
|
||
def _safe_parse_datetime(self, value):
|
||
if isinstance(value, datetime):
|
||
return value
|
||
if not value:
|
||
return None
|
||
s = str(value).strip()
|
||
fmts = [
|
||
"%Y-%m-%d %H:%M:%S",
|
||
"%Y-%m-%d %H:%M",
|
||
"%d.%m.%Y %H:%M:%S",
|
||
"%d.%m.%Y %H:%M",
|
||
"%Y-%m-%dT%H:%M:%S",
|
||
"%Y-%m-%dT%H:%M:%S.%f",
|
||
]
|
||
for fmt in fmts:
|
||
try:
|
||
return datetime.strptime(s, fmt)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
# ------------------------------------------------------------------------
|
||
def handle_closedaccountselect(self, operation: str, ucislo: str):
|
||
Logger.info(f"CTRL: handle_closedaccountselect op={operation} ucislo={ucislo}")
|
||
# tady mapování textu → akce
|
||
if operation == "Tisk kopie":
|
||
try:
|
||
ucet = api_call.load_ucet_by_ucislo_API(self.ctx, ucislo)
|
||
self._print_receipt_copy(ucet)
|
||
api_call.unblock_ucet_by_ucislo_API(self.ctx, ucislo)
|
||
self.close_closed_select()
|
||
return
|
||
except Exception as e:
|
||
print("EXCEPTION:", e)
|
||
traceback.print_exc()
|
||
_popup_info("Chyba", f"Nelze vytisknout kopii účtu {ucislo} ze serveru\n{e}")
|
||
return
|
||
if operation in ("Storno účtu", "Změna druhu platby"):
|
||
self.handle_storno_or_payment_change(ucislo, operation, kasutxt=self.kasutxt)
|
||
return
|
||
if operation == "Storno položek":
|
||
_popup_info("Chyba", "Operace neimplementovana")
|
||
return
|
||
if operation == "Storno, vrácení na stůl":
|
||
try:
|
||
ucet = api_call.load_ucet_by_ucislo_API(self.ctx, ucislo)
|
||
self.closed_receipt_storno_return_to_table(ucet)
|
||
except Exception as e:
|
||
print("EXCEPTION:", e)
|
||
traceback.print_exc()
|
||
_popup_info("Chyba", f"Nelze vratit ucet {ucislo} na stol\n{e}")
|
||
return
|
||
if operation in ("Uzávěrka", "Meziuzávěrka"):
|
||
save_flag = (operation == "Uzávěrka")
|
||
report, err = api_call.clsrep_API(
|
||
self.ctx, None, ucislo, save=False ) # od minule uzaverky do urceneho uctu
|
||
if err:
|
||
_popup_info("Uzávěrka", err)
|
||
return
|
||
if save_flag:
|
||
if self._closure_needs_carry_dialog(report):
|
||
self._show_closure_carry_dialog(report, ucislo)
|
||
else:
|
||
self._finish_closure_save(ucislo)
|
||
return
|
||
self._show_clsrep_preview(
|
||
report.model_dump(),
|
||
kind="closure_preview",
|
||
title="Medziuzavierka",
|
||
)
|
||
return
|
||
Logger.warning(f"CTRL: unknown closed operation: {operation}")
|
||
# ------------------------------------------------------------------------
|
||
def close_closed_select(self):
|
||
sm = self.app.sm
|
||
if sm.has_screen("closed_select"):
|
||
sm.remove_widget(sm.get_screen("closed_select"))
|
||
if sm.has_screen("closed_receipts"):
|
||
sm.remove_widget(sm.get_screen("closed_receipts"))
|
||
sm.current = SCREEN_ACCOUNT
|
||
# ------------------------------------------------------------------------
|
||
def handle_storno_or_payment_change(self, ucislo: int | str, operation: str) -> None:
|
||
#načte uzavřený účet dle ucisla
|
||
#podle operation buď provede storno, nebo otevře dialog a po dokončení udělá:
|
||
#(nový účet s novými platbami) + (storno) + (update původního storno linku)
|
||
try:
|
||
ucet = api_call.load_ucet_by_ucislo_API(self.ctx, ucislo)
|
||
except Exception as e:
|
||
print("EXCEPTION:", e)
|
||
traceback.print_exc()
|
||
_popup_info("Chyba", f"Nelze stornovat účet nebo změnit druh platby {ucislo}\n{e}")
|
||
return
|
||
if ucet.storno: #stornovany ucet a jeho storno nejde stornovat
|
||
_popup_info("Chyba",f"Ucet {ucet.ucislo}, castka {ucet.total_base_currency} jiz byl stornovan\n uctem {ucet.storno}")
|
||
return
|
||
if ucet.is_storno:
|
||
_popup_info("Chyba",f"Tento ucet {ucet.ucislo}, castka {ucet.total_base_currency} je storno\n uctu {ucet.is_storno}")
|
||
return
|
||
ucet_puvodni = ucet.model_copy(deep=True)
|
||
try:
|
||
self._validate_terminal_storno_settings(ucet_puvodni, origin="Storno")
|
||
except Exception as e:
|
||
_popup_info("Chyba", str(e))
|
||
return
|
||
if operation == "Storno účtu":
|
||
Logger.info(f"storno uctu {ucet_puvodni.ucislo}")
|
||
self._do_storno_flow(ucet_puvodni, reason_origin="Storno")
|
||
return
|
||
if operation == "Změna druhu platby":
|
||
Logger.info(f"zmena druhu platby {ucet_puvodni.ucislo}")
|
||
self._do_payment_change_flow(ucet_puvodni)
|
||
return
|
||
_popup_info("Chyba", f"Neznámá operace: {operation}")
|
||
# ------------------------------------------------------------------------
|
||
def _do_payment_change_flow(self, ucet_puvodni):
|
||
#"""
|
||
# Změna druhu platby:
|
||
#1) připraví kopii účtu jako 'nový účet' bez plateb
|
||
#2) otevře PaymentDialog
|
||
#3) v on_done:
|
||
#- uloží nový účet (origin=Zmena_Platby)
|
||
#- vytvoří a uloží storno účet
|
||
#- aktualizuje původní účet: storno=<ucislo storna>, origin upraví na StorPaymChg
|
||
#"""
|
||
ucet_novy = ucet_puvodni.model_copy(deep=True)
|
||
self._restore_ucet_default_prices(ucet_novy)
|
||
ucet_novy.platby = []
|
||
ucet_novy.storno = ""
|
||
ucet_novy.is_storno = f"Z{ucet_puvodni.ucislo}" # číslo měněného
|
||
ucet_novy.ucislo = "" # server přidělí nové
|
||
#ucet_novy.stul = ""
|
||
ucet_novy.origin = "Zmena_Platby"
|
||
ucet_novy.blocked_by = "" # pokud posíláš klientem; ideálně řešit serverem
|
||
def _on_done(*args, **kwargs):
|
||
#"""
|
||
#Sem se musí dostat účet s vybranými platbami.
|
||
#Podle toho, jak PaymentDialog volá callback, zkusíme ho vytáhnout robustně:
|
||
#- buď jako kwargs['ucet']
|
||
#- nebo jako args[0]
|
||
#- jinak použijeme ucet_novy (který PaymentDialog mohl měnit referencí)
|
||
#"""
|
||
try:
|
||
ucet_from_dialog = kwargs.get("ucet") if isinstance(kwargs, dict) else None
|
||
if not ucet_from_dialog and args:
|
||
ucet_from_dialog = args[0]
|
||
if not ucet_from_dialog:
|
||
ucet_from_dialog = ucet_novy
|
||
# bezpečnost: když uživatel nic nezadal
|
||
if not getattr(ucet_from_dialog, "platby", None) or len(ucet_from_dialog.platby) == 0:
|
||
_popup_info("Zrušeno", "Nebyla vybrána žádná platba – operace zrušena.")
|
||
return
|
||
self._finalize_payment_change(ucet_puvodni, ucet_from_dialog)
|
||
except Exception as e:
|
||
print("EXCEPTION:", e)
|
||
traceback.print_exc()
|
||
_popup_info("Chyba", f"Změna druhu platby selhala\n{e}")
|
||
def _on_cancel(*_):
|
||
_popup_info("Zrušeno", "Změna druhu platby zrušena.")
|
||
payment.PaymentDialog(
|
||
ucet=ucet_novy,
|
||
payment_types=self._closed_payment_types(),
|
||
setup=self.setup,
|
||
on_done=_on_done,
|
||
on_cancel=_on_cancel,
|
||
discounts=self._closed_discounts(),
|
||
discount_permissions=self._closed_discount_permissions(),
|
||
discounts_all_allowed=bool(getattr(self.user_login, "is_admin", False)),
|
||
printers=self._printers,
|
||
bankterms=self._bankterms,
|
||
default_printer=self.default_printer,
|
||
on_printer_change=lambda prn: setattr(self, "default_printer", prn),
|
||
handler_runner=self._closed_payment_handler_runner,
|
||
cenik_map=self.pos_static_maps.get("cenik_map", {}),
|
||
kasutxt=self.kasutxt,
|
||
controller=self
|
||
).open()
|
||
# ------------------------------------------------------------------------
|
||
def _finalize_payment_change(self, ucet_puvodni, ucet_novy):
|
||
#Volá se až po dokončení Closed_PaymentDialog:
|
||
# uloží nový účet s novými platbami
|
||
# vytvoří storno účet a uloží ho
|
||
# aktualizuje původní účet: storno=..., origin=StorPaymChg, blocked_by=""
|
||
apictx = self.ctx # zkratka
|
||
# ulož nový účet (s platbami z dialogu)
|
||
ucet_novy.sumdph()
|
||
ucet_novy.origin = "Zmena_Platby"
|
||
resp_new = api_call.save_ucet_API(apictx, ucet_novy)
|
||
if not resp_new or not getattr(resp_new, "ucislo", None):
|
||
raise RuntimeError("Uložení nového účtu pro změnu platby selhalo (bez ucisla).")
|
||
self._enqueue_receipt_print_jobs(resp_new, kind="payment_change", title="Zmena druhu platby")
|
||
# vytvoř + ulož storno účet
|
||
resp_storno = self._create_and_save_storno(ucet_puvodni, apictx)
|
||
# 3) update původního účtu
|
||
ucet_upd = ucet_puvodni.model_copy(deep=True)
|
||
ucet_upd.storno = resp_storno.ucislo
|
||
ucet_upd.blocked_by = ""
|
||
# označení, že je to původní účet po změně platby
|
||
if ucet_upd.origin == "Zmena_Platby":
|
||
ucet_upd.origin = "StorPaymChg"
|
||
resp_upd = api_call.save_ucet_API(apictx, ucet_upd)
|
||
if not resp_upd or not getattr(resp_upd, "ucislo", None):
|
||
_popup_info("Chyba", f"Update původního účtu {ucet_puvodni.ucislo} nedokončen.")
|
||
#return
|
||
#_popup_info("Hotovo", f"Změna platby dokončena.\nNový účet: {resp_new.ucislo}\nStorno: {resp_storno.ucislo}")
|
||
if not self._return_to_closed_receipts():
|
||
self.close_closed_select()
|
||
# ------------------------------------------------------------------------
|
||
def _sum_items(self, ucet):
|
||
if not ucet or not ucet.poloz:
|
||
return
|
||
|
||
merged = {}
|
||
|
||
for p in ucet.poloz:
|
||
# 🔹 zprávy → hashovateľné + stabilné
|
||
zpravy_key = tuple(sorted(p.zpravy or []))
|
||
|
||
# 🔹 key rovno ako tuple (bez listu)
|
||
if p.typ_menu != 0:
|
||
key = (
|
||
p.id_card,
|
||
p.nazev,
|
||
p.cena,
|
||
p.dph,
|
||
p.mena,
|
||
p.cenhlad,
|
||
p.delitel,
|
||
p.sklad,
|
||
p.guest_id,
|
||
p.course_id,
|
||
zpravy_key,
|
||
p.group_id
|
||
)
|
||
else:
|
||
key = (
|
||
p.id_card,
|
||
p.nazev,
|
||
p.cena,
|
||
p.dph,
|
||
p.mena,
|
||
p.cenhlad,
|
||
p.delitel,
|
||
p.sklad,
|
||
p.guest_id,
|
||
p.course_id,
|
||
zpravy_key
|
||
)
|
||
|
||
existing = merged.get(key)
|
||
|
||
if existing is None:
|
||
# 🔥 NEKOPÍRUJ – vezmi referenciu
|
||
merged[key] = p
|
||
else:
|
||
existing.pocet += p.pocet
|
||
|
||
ucet.poloz = list(merged.values())
|
||
|
||
# ------------------------------------------------------------------------
|
||
def _do_storno_flow(self, ucet_puvodni, reason_origin: str = "Storno") -> None:
|
||
# vytvoří storno účet a uloží
|
||
# aktualizuje původní účet: storno=<ucislo storna>, blocked_by=""
|
||
apictx = self.ctx
|
||
try:
|
||
resp_storno = self._create_and_save_storno(ucet_puvodni, apictx, origin=reason_origin)
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
_popup_info("Chyba", f"Storno operace účtu {ucet_puvodni.ucislo} zlyhala\n{e}")
|
||
return
|
||
ucet_upd = ucet_puvodni.model_copy(deep=True)
|
||
ucet_upd.storno = resp_storno.ucislo
|
||
ucet_upd.blocked_by = ""
|
||
if ucet_upd.origin == "Zmena_Platby":
|
||
ucet_upd.origin = "StorPaymChg"
|
||
resp_upd = api_call.save_ucet_API(apictx, ucet_upd)
|
||
if not resp_upd or not getattr(resp_upd, "ucislo", None):
|
||
_popup_info("Chyba", f"Storno operace účtu {ucet_puvodni.ucislo} nedokončena")
|
||
if getattr(ucet_puvodni, "limit_id", None):
|
||
try:
|
||
api_call.clear_limit_ucet_API(apictx, ucet_puvodni)
|
||
except Exception as e:
|
||
Logger.exception("CTRL: limit clear after storno failed")
|
||
_popup_info(
|
||
"Limity",
|
||
"Storno uctu prebehlo, ale limit sa nepodarilo odznacit v PostgreSQL.\n"
|
||
f"{e}",
|
||
)
|
||
# return
|
||
#_popup_info("Hotovo", f"Storno dokončeno.\nStorno účet: {resp_storno.ucislo}")
|
||
if not self._return_to_closed_receipts():
|
||
self.close_closed_select()
|
||
# ------------------------------------------------------------------------
|
||
def _create_and_save_storno(
|
||
self,
|
||
ucet_puvodni,
|
||
apictx,
|
||
print_docs: bool = True,
|
||
origin: str = "Storno",
|
||
fiscal_result: dict | None = None,
|
||
):
|
||
#Vrací response z save_ucet_API (musí obsahovat ucislo).
|
||
ucet = ucet_puvodni.model_copy(deep=True)
|
||
ucet.autor = self.user_login.name
|
||
ucet.is_storno = ucet_puvodni.ucislo
|
||
ucet.origin = origin or "Storno"
|
||
if fiscal_result:
|
||
ucet.fiscal_result = dict(fiscal_result)
|
||
self._ensure_storno_bill_printer(ucet, ucet_puvodni)
|
||
# nový storno účet
|
||
ucet.ucislo = ""
|
||
ucet.blocked_by = ""
|
||
ucet.open_at = data.now_clk_str()
|
||
ucet.closed_at = data.stime_str()
|
||
ucet.datetime = data.stime_str()
|
||
ucet.total_base_currency = -ucet.total_base_currency
|
||
ucet.round50 = -float(getattr(ucet, "round50", 0) or 0)
|
||
# dane (základy) do mínusu
|
||
for dd in getattr(ucet, "dane", []) or []:
|
||
dd.zaklad = -dd.zaklad
|
||
# sleva do mínusu
|
||
if getattr(ucet, "discount_abs", 0):
|
||
ucet.discount_abs = -ucet.discount_abs
|
||
ucet.platby = self._build_storno_payments(ucet_puvodni, origin=ucet.origin)
|
||
# položky do mínusu + reset kstornu
|
||
for po in getattr(ucet, "poloz", []) or []:
|
||
po.pocet = -po.pocet
|
||
po.kstornu = 0
|
||
resp = api_call.save_ucet_API(apictx, ucet)
|
||
if not resp or not getattr(resp, "ucislo", None):
|
||
raise RuntimeError(f"Uložení storno účtu k {ucet_puvodni.ucislo} selhalo.")
|
||
if print_docs:
|
||
printed_resp = self._print_storno_receipt_documents(resp, kind="storno", title="Storno")
|
||
if printed_resp:
|
||
resp = printed_resp
|
||
self._print_closed_storno_kitchen(resp)
|
||
return resp
|
||
# ------------------------------------------------------------------------
|
||
def _closed_payment_types(self):
|
||
payments = list(getattr(self, "_payments", []) or [])
|
||
if not payments and self.setup:
|
||
payments = list(getattr(self.setup, "platby", []) or [])
|
||
return payments
|
||
|
||
def _closed_payment_handler_runner(self, ptype=None, ucet=None, dialog=None, *args, **kwargs):
|
||
if ptype is None:
|
||
ptype = kwargs.get("ptype")
|
||
if ucet is None:
|
||
ucet = kwargs.get("ucet")
|
||
if dialog is None:
|
||
dialog = kwargs.get("dialog")
|
||
if self._closed_payment_handler_host is None:
|
||
self._closed_payment_handler_host = ClosedPaymentHandlerHost(self)
|
||
else:
|
||
self._closed_payment_handler_host.update(self)
|
||
return self._closed_payment_handler_host._run_payment_handler(ptype, ucet, dialog)
|
||
|
||
def _return_to_closed_receipts(self) -> bool:
|
||
sm = self.app.sm
|
||
if not sm or not sm.has_screen("closed_receipts"):
|
||
return False
|
||
screen = sm.get_screen("closed_receipts")
|
||
sm.current = "closed_receipts"
|
||
if hasattr(screen, "refresh"):
|
||
Clock.schedule_once(lambda *_: screen.refresh(), 0)
|
||
return True
|
||
|
||
def _closed_discounts(self):
|
||
return list(getattr(getattr(self, "zlavy", None), "zlavy", []) or [])
|
||
|
||
def _closed_discount_permissions(self):
|
||
if getattr(getattr(self, "user_login", None), "is_admin", False):
|
||
return []
|
||
return list(getattr(self, "_discounts", []) or [])
|
||
|
||
def _restore_ucet_default_prices(self, ucet):
|
||
for pol in getattr(ucet, "poloz", []) or []:
|
||
if getattr(pol, "def_cena", None) is not None:
|
||
pol.cena = float(pol.def_cena or 0)
|
||
pol.cena_puv = float(pol.def_cena or 0)
|
||
if getattr(pol, "def_dph", None):
|
||
pol.dph = pol.def_dph
|
||
if getattr(pol, "def_hlad", None):
|
||
pol.cenhlad = pol.def_hlad
|
||
ucet.discount_abs = 0
|
||
ucet.discount_id = None
|
||
ucet.discount_name = ""
|
||
ucet.discounts_applied = []
|
||
ucet.discounts_prorated = False
|
||
try:
|
||
ucet.sumdph()
|
||
except Exception:
|
||
Logger.exception("CTRL: default price restore DPH failed")
|
||
return ucet
|
||
|
||
def load_closed_ucet_detail(self, ucislo: str) -> data.Ucet:
|
||
return api_call.load_ucet_by_ucislo_API(self.ctx, ucislo)
|
||
|
||
def closed_receipt_print_copy(self, ucet: data.Ucet):
|
||
try:
|
||
self._print_receipt_copy(ucet)
|
||
api_call.unblock_ucet_by_ucislo_API(self.ctx, ucet.ucislo)
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
self._popup_info("Chyba", f"Kopiu uctu {ucet.ucislo} sa nepodarilo vytlacit:\n{e}")
|
||
|
||
def closed_receipt_storno_full(self, ucet: data.Ucet):
|
||
if any(getattr(pol, "kstornu", None) is not None for pol in (getattr(ucet, "poloz", []) or [])):
|
||
quantities = {
|
||
str(getattr(pol, "line_id", "") or ""): self._closed_line_storno_available(pol)
|
||
for pol in (getattr(ucet, "poloz", []) or [])
|
||
if str(getattr(pol, "line_id", "") or "") and self._closed_line_storno_available(pol) > 0
|
||
}
|
||
if not quantities:
|
||
self._popup_info("Storno", "Na tomto ucte uz nie je ziadne mnozstvo na storno.")
|
||
return
|
||
self.closed_receipt_storno_items(ucet, quantities)
|
||
self._return_to_closed_receipts()
|
||
return
|
||
self.handle_storno_or_payment_change(ucet.ucislo, "Storno účtu")
|
||
|
||
def closed_receipt_change_payment(self, ucet: data.Ucet):
|
||
self.handle_storno_or_payment_change(ucet.ucislo, "Změna druhu platby")
|
||
|
||
def closed_receipt_edit_tip(self, ucet: data.Ucet):
|
||
if getattr(ucet, "is_storno", None):
|
||
self._popup_info("TIP", "Na storno ucte nie je mozne menit TIP.")
|
||
return
|
||
payments = list(getattr(ucet, "platby", []) or [])
|
||
if not payments:
|
||
self._popup_info("TIP", "Ucet nema ziadnu platbu.")
|
||
return
|
||
if len(payments) == 1:
|
||
self._ask_closed_tip_amount(ucet, 0)
|
||
return
|
||
popup = Popup(title="Vyber platbu pre TIP", size_hint=(None, None), size=(dp(520), dp(420)))
|
||
box = BoxLayout(orientation="vertical", spacing=dp(6), padding=dp(8))
|
||
scroll = ScrollView(do_scroll_y=True)
|
||
rows = BoxLayout(orientation="vertical", spacing=dp(4), size_hint_y=None)
|
||
rows.bind(minimum_height=rows.setter("height"))
|
||
for idx, pay in enumerate(payments):
|
||
text = (
|
||
f"{getattr(pay, 'nazev', '') or getattr(pay, 'code', '')} "
|
||
f"{float(getattr(pay, 'suma_czk', 0) or getattr(pay, 'suma', 0) or 0):.2f} "
|
||
f"TIP {float(getattr(pay, 'tip', 0) or 0):.2f}"
|
||
)
|
||
btn = Button(text=text, size_hint_y=None, height=dp(58), halign="left", valign="middle")
|
||
btn.bind(size=lambda inst, *_: setattr(inst, "text_size", (inst.width - dp(16), None)))
|
||
btn.bind(on_press=lambda _btn, i=idx: self._select_closed_tip_payment(popup, ucet, i))
|
||
rows.add_widget(btn)
|
||
scroll.add_widget(rows)
|
||
box.add_widget(scroll)
|
||
btn_cancel = Button(text="Zrusit", size_hint_y=None, height=dp(48))
|
||
btn_cancel.bind(on_press=lambda *_: popup.dismiss())
|
||
box.add_widget(btn_cancel)
|
||
popup.content = box
|
||
popup.open()
|
||
|
||
def _select_closed_tip_payment(self, popup, ucet: data.Ucet, payment_index: int):
|
||
popup.dismiss()
|
||
self._ask_closed_tip_amount(ucet, payment_index)
|
||
|
||
def _ask_closed_tip_amount(self, ucet: data.Ucet, payment_index: int):
|
||
payments = list(getattr(ucet, "platby", []) or [])
|
||
if payment_index < 0 or payment_index >= len(payments):
|
||
return
|
||
pay = payments[payment_index]
|
||
old_tip = float(getattr(pay, "tip", 0) or 0)
|
||
popup = Popup(title="Zadanie TIPu", size_hint=(None, None), size=(dp(420), dp(260)))
|
||
box = BoxLayout(orientation="vertical", spacing=dp(8), padding=dp(10))
|
||
box.add_widget(Label(
|
||
text=f"{getattr(pay, 'nazev', '') or getattr(pay, 'code', '')}\nPovodny TIP: {old_tip:.2f}",
|
||
size_hint_y=None,
|
||
height=dp(58),
|
||
))
|
||
inp = TextInput(text=f"{old_tip:.2f}", multiline=False, size_hint_y=None, height=dp(48))
|
||
box.add_widget(inp)
|
||
buttons = BoxLayout(size_hint_y=None, height=dp(50), spacing=dp(8))
|
||
btn_cancel = Button(text="Zrusit")
|
||
btn_ok = Button(text="OK", background_color=(0.2, 0.6, 0.2, 1))
|
||
btn_cancel.bind(on_press=lambda *_: popup.dismiss())
|
||
btn_ok.bind(on_press=lambda *_: self._confirm_closed_tip(popup, ucet, payment_index, inp.text))
|
||
buttons.add_widget(btn_cancel)
|
||
buttons.add_widget(btn_ok)
|
||
box.add_widget(buttons)
|
||
popup.content = box
|
||
popup.open()
|
||
Clock.schedule_once(lambda *_: setattr(inp, "focus", True), 0.1)
|
||
|
||
def _confirm_closed_tip(self, popup, ucet: data.Ucet, payment_index: int, value: str):
|
||
try:
|
||
new_tip = round(float(str(value or "0").replace(",", ".")), 2)
|
||
except Exception:
|
||
self._popup_info("TIP", "Neplatna hodnota TIPu.")
|
||
return
|
||
if new_tip < 0:
|
||
self._popup_info("TIP", "TIP nemoze byt zaporny.")
|
||
return
|
||
try:
|
||
self._save_closed_tip(ucet, payment_index, new_tip)
|
||
popup.dismiss()
|
||
self._return_to_closed_receipts()
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
self._popup_info("TIP", f"TIP sa nepodarilo ulozit:\n{e}")
|
||
|
||
def _save_closed_tip(self, ucet: data.Ucet, payment_index: int, new_tip: float):
|
||
updated = ucet.model_copy(deep=True)
|
||
payment_row = updated.platby[payment_index]
|
||
old_tip = float(getattr(payment_row, "tip", 0) or 0)
|
||
diff = round(float(new_tip) - old_tip, 2)
|
||
if abs(diff) < 0.005:
|
||
return
|
||
hotel_target = getattr(payment_row, "hotel_charge", None) or getattr(updated, "hotel_charge", None)
|
||
if hotel_target:
|
||
self._send_closed_tip_hotel_delta(updated, payment_row, diff, hotel_target)
|
||
payment_row.tip = float(new_tip)
|
||
updated.blocked_by = ""
|
||
api_call.save_ucet_API(self.ctx, updated)
|
||
|
||
def _send_closed_tip_hotel_delta(self, ucet: data.Ucet, payment_row: data.Platba, diff: float, hotel_target):
|
||
tip_payment = payment_row.model_copy(deep=True)
|
||
tip_payment.suma = 0
|
||
tip_payment.suma_czk = 0
|
||
tip_payment.tip = float(diff)
|
||
tip_payment.hotel_charge = hotel_target
|
||
tip_ucet = ucet.model_copy(deep=True)
|
||
tip_ucet.poloz = []
|
||
tip_ucet.platby = [tip_payment]
|
||
tip_ucet.hotel_charge = None
|
||
tip_ucet.hotel_charge_preparation = None
|
||
tip_ucet.hotel_charge_send_result = None
|
||
tip_ucet.closed_at = data.stime_str()
|
||
tip_ucet.datetime = data.stime_str()
|
||
tip_ucet.origin = "TIP"
|
||
result = api_call.send_hotel_charge_API(self.ctx, tip_ucet)
|
||
if not getattr(result, "ok", False):
|
||
raise RuntimeError(getattr(result, "message", "") or "TIP sa nepodarilo odoslat na recepciu.")
|
||
|
||
def closed_receipt_storno_return_to_table(self, ucet_puvodni: data.Ucet):
|
||
if getattr(ucet_puvodni, "limit_id", None):
|
||
self._popup_info("Storno", "Limitovy ucet nie je mozne stornovat s vratenim na stol.")
|
||
return
|
||
if getattr(ucet_puvodni, "storno", None):
|
||
self._popup_info("Chyba", f"Ucet {ucet_puvodni.ucislo} uz bol stornovany.")
|
||
return
|
||
if getattr(ucet_puvodni, "is_storno", None):
|
||
self._popup_info("Chyba", "Storno ucet nie je mozne vratit na stol.")
|
||
return
|
||
self._open_storno_return_table_select(ucet_puvodni)
|
||
|
||
def _open_storno_return_table_select(self, ucet_puvodni: data.Ucet):
|
||
self._storno_return_ucet = ucet_puvodni.model_copy(deep=True)
|
||
screen = accountselect.AccountSelectScreen(
|
||
name="account_storno_return",
|
||
controller=self,
|
||
get_ucty=lambda: self.load_stoly(closed=False),
|
||
on_select=self._on_storno_return_table_selected,
|
||
on_cancel=self._on_storno_return_cancelled,
|
||
on_logout=self.logout_user,
|
||
on_info=None,
|
||
mode="split",
|
||
only_opened=False,
|
||
allow_manual=True,
|
||
)
|
||
sm = self.app.sm
|
||
if sm.has_screen("account_storno_return"):
|
||
sm.remove_widget(sm.get_screen("account_storno_return"))
|
||
sm.add_widget(screen)
|
||
sm.current = "account_storno_return"
|
||
|
||
def _close_storno_return_table_select(self):
|
||
sm = self.app.sm
|
||
if sm.has_screen("closed_receipts"):
|
||
sm.current = "closed_receipts"
|
||
elif sm.has_screen("closed_select"):
|
||
sm.current = "closed_select"
|
||
elif sm.has_screen(SCREEN_ACCOUNT):
|
||
sm.current = SCREEN_ACCOUNT
|
||
if sm.has_screen("account_storno_return"):
|
||
sm.remove_widget(sm.get_screen("account_storno_return"))
|
||
|
||
def _on_storno_return_cancelled(self):
|
||
self._storno_return_ucet = None
|
||
self._close_storno_return_table_select()
|
||
|
||
def _on_storno_return_table_selected(self, target_stul: str):
|
||
ucet_puvodni = getattr(self, "_storno_return_ucet", None)
|
||
if not ucet_puvodni:
|
||
self._popup_info("Chyba", "Nie je nacitany ucet pre vratenie na stol.")
|
||
self._close_storno_return_table_select()
|
||
return
|
||
target_stul = str(target_stul or "").strip()
|
||
if not target_stul:
|
||
self._popup_info("Chyba", "Nie je vybrany cielovy stol.")
|
||
return
|
||
try:
|
||
resp_storno = self._create_and_save_storno(ucet_puvodni, self.ctx)
|
||
restored = ucet_puvodni.model_copy(deep=True)
|
||
self._restore_ucet_default_prices(restored)
|
||
restored.ucislo = None
|
||
restored.closed_at = None
|
||
restored.datetime = ""
|
||
restored.open_at = data.now_clk_str()
|
||
restored.blocked_by = ""
|
||
restored.platby = []
|
||
restored.dane = []
|
||
restored.storno = None
|
||
restored.is_storno = None
|
||
restored.origin = "Vratene_Storno"
|
||
restored.stul = target_stul
|
||
restored.bill_printer = ""
|
||
restored.sumdph()
|
||
api_call.merge_ucet_API(self.ctx, restored, target_stul)
|
||
ucet_upd = ucet_puvodni.model_copy(deep=True)
|
||
ucet_upd.storno = resp_storno.ucislo
|
||
ucet_upd.blocked_by = ""
|
||
api_call.save_ucet_API(self.ctx, ucet_upd)
|
||
self._storno_return_ucet = None
|
||
returned_to_closed = self._return_to_closed_receipts()
|
||
self._close_storno_return_table_select()
|
||
if not returned_to_closed:
|
||
self.close_closed_select()
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
self._popup_info("Chyba", f"Ucet sa nepodarilo vratit na stol:\n{e}")
|
||
|
||
def closed_receipt_storno_items(self, ucet_puvodni: data.Ucet, quantities: dict[str, float]):
|
||
if getattr(ucet_puvodni, "limit_id", None):
|
||
self._popup_info("Storno poloziek", "Pri limitovom ucte je povolene iba storno celeho uctu.")
|
||
return
|
||
if getattr(ucet_puvodni, "storno", None):
|
||
self._popup_info("Chyba", f"Ucet {ucet_puvodni.ucislo} uz bol stornovany.")
|
||
return
|
||
if getattr(ucet_puvodni, "is_storno", None):
|
||
self._popup_info("Chyba", "Zo storno uctu nie je mozne robit storno poloziek.")
|
||
return
|
||
quantities = {
|
||
str(line_id): float(qty or 0)
|
||
for line_id, qty in (quantities or {}).items()
|
||
if str(line_id) and float(qty or 0) > 0
|
||
}
|
||
selected = []
|
||
for pol in (getattr(ucet_puvodni, "poloz", []) or []):
|
||
line_id = str(getattr(pol, "line_id", "") or "")
|
||
qty = quantities.get(line_id, 0)
|
||
if qty <= 0:
|
||
continue
|
||
available = self._closed_line_storno_available(pol)
|
||
if qty > available + 0.0001:
|
||
self._popup_info("Storno poloziek", f"Polozka {getattr(pol, 'nazev', '')} nema dostatocne mnozstvo na storno.")
|
||
return
|
||
selected.append((pol, qty, available))
|
||
if not selected:
|
||
self._popup_info("Storno poloziek", "Nie je vybrana ziadna polozka.")
|
||
return
|
||
try:
|
||
self._validate_terminal_storno_settings(ucet_puvodni, origin="Storno_polozek")
|
||
storno = ucet_puvodni.model_copy(deep=True)
|
||
storno.poloz = []
|
||
for pol, qty, _available in selected:
|
||
p = pol.model_copy(deep=True)
|
||
p.pocet = -abs(float(qty or 0))
|
||
p.kstornu = 0
|
||
storno.poloz.append(p)
|
||
storno.autor = self.user_login.name
|
||
storno.is_storno = ucet_puvodni.ucislo
|
||
storno.origin = "Storno_polozek"
|
||
storno.ucislo = ""
|
||
storno.blocked_by = ""
|
||
storno.open_at = data.now_clk_str()
|
||
storno.closed_at = data.stime_str()
|
||
storno.datetime = data.stime_str()
|
||
storno.discount_abs = 0
|
||
storno.discount_id = None
|
||
storno.discount_name = ""
|
||
storno.discounts_applied = []
|
||
storno.discounts_prorated = True
|
||
self._ensure_storno_bill_printer(storno, ucet_puvodni)
|
||
storno.sumdph()
|
||
self._set_partial_storno_payments(storno, ucet_puvodni)
|
||
resp = api_call.save_ucet_API(self.ctx, storno)
|
||
resp = self._print_storno_receipt_documents(resp, kind="storno_items", title="Storno poloziek")
|
||
self._print_closed_storno_kitchen(resp)
|
||
updated = ucet_puvodni.model_copy(deep=True)
|
||
for pol in getattr(updated, "poloz", []) or []:
|
||
line_id = str(getattr(pol, "line_id", "") or "")
|
||
if line_id not in quantities:
|
||
continue
|
||
available = self._closed_line_storno_available(pol)
|
||
pol.kstornu = max(round(available - quantities[line_id], 4), 0)
|
||
updated.blocked_by = ""
|
||
api_call.save_ucet_API(self.ctx, updated)
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
self._popup_info("Chyba", f"Storno poloziek sa nepodarilo ulozit:\n{e}")
|
||
|
||
def _closed_line_storno_available(self, pol) -> float:
|
||
units = abs(float(getattr(pol, "pocet", 0) or 0))
|
||
raw = getattr(pol, "kstornu", None)
|
||
if raw is None:
|
||
return units
|
||
try:
|
||
return max(min(float(raw or 0), units), 0.0)
|
||
except Exception:
|
||
return units
|
||
|
||
def _print_closed_storno_kitchen(self, storno_ucet: data.Ucet):
|
||
try:
|
||
self._enqueue_kitchen_print_jobs(storno_ucet, kind="storno_ucet")
|
||
except Exception:
|
||
Logger.exception("CTRL: closed receipt kitchen storno print failed")
|
||
|
||
def _enqueue_kitchen_print_jobs(self, ucet_print, kind: str = "bon"):
|
||
if not ucet_print or not getattr(ucet_print, "poloz", None):
|
||
return []
|
||
if isinstance(ucet_print, data.UcetEdit):
|
||
payload = data.ucet_edit_to_ucet(ucet_print)
|
||
else:
|
||
payload = ucet_print.model_copy(deep=True)
|
||
payload.id_kas = payload.id_kas or self.ctx.id_kas
|
||
try:
|
||
jobs = api_call.create_kitchen_print_jobs_API(
|
||
self.ctx,
|
||
payload,
|
||
kind=kind,
|
||
room_name=(self.client_settings or {}).get("room_name", "") if isinstance(self.client_settings, dict) else "",
|
||
pos_name=getattr(self.setup, "pos_name", "") if self.setup else "",
|
||
required=True,
|
||
priority=50 if kind == "bon" else 45,
|
||
)
|
||
if not jobs:
|
||
Logger.warning(f"CTRL: no kitchen print jobs created kind={kind}")
|
||
else:
|
||
Logger.info(f"CTRL: kitchen print jobs created kind={kind} count={len(jobs)}")
|
||
return jobs
|
||
except Exception as e:
|
||
Logger.exception("CTRL: kitchen print job creation failed")
|
||
_popup_info(
|
||
"Tlač",
|
||
"Kuchynský bon sa nepodarilo zaradiť do tlačovej fronty.\n"
|
||
f"{e}",
|
||
)
|
||
return []
|
||
|
||
def _receipt_has_fiscal_payment(self, ucet: data.Ucet) -> bool:
|
||
return any(bool(getattr(payment, "fiscal", False)) for payment in (getattr(ucet, "platby", []) or []))
|
||
|
||
def _mark_pohladavka_ucet(self, ucet: data.Ucet | None) -> bool:
|
||
if not ucet:
|
||
return True
|
||
cenik_map = (self.pos_static_maps or {}).get("cenik_map") or {}
|
||
if not cenik_map:
|
||
return True
|
||
has_pohladavka = False
|
||
has_regular = False
|
||
for pol in (getattr(ucet, "poloz", []) or []):
|
||
cenpol = cenik_map.get(getattr(pol, "id_card", None))
|
||
is_pohladavka = bool(cenpol and "pohladavka" in (getattr(cenpol, "atributes", []) or []))
|
||
if is_pohladavka:
|
||
has_pohladavka = True
|
||
else:
|
||
has_regular = True
|
||
if has_pohladavka and has_regular:
|
||
self._popup_info(
|
||
"Pohladavka",
|
||
"Pohladavku nie je mozne kombinovat s beznymi polozkami na jednom ucte.",
|
||
)
|
||
return False
|
||
ucet.pohladavka = 1 if has_pohladavka else None
|
||
return True
|
||
|
||
def _receipt_printer_no(self, ucet: data.Ucet) -> str:
|
||
return str(getattr(ucet, "bill_printer", "") or self.default_printer or "").strip()
|
||
|
||
def _ensure_storno_bill_printer(self, storno_ucet: data.Ucet, source_ucet: data.Ucet | None = None) -> data.Ucet:
|
||
printer_no = str(getattr(storno_ucet, "bill_printer", "") or "").strip()
|
||
if not printer_no and source_ucet is not None:
|
||
printer_no = str(getattr(source_ucet, "bill_printer", "") or "").strip()
|
||
if not printer_no:
|
||
printer_no = str(self.default_printer or "").strip()
|
||
if printer_no:
|
||
storno_ucet.bill_printer = printer_no
|
||
return storno_ucet
|
||
|
||
def _print_storno_receipt_documents(
|
||
self,
|
||
storno_ucet: data.Ucet,
|
||
kind: str = "storno",
|
||
title: str = "Storno",
|
||
) -> data.Ucet:
|
||
self._ensure_storno_bill_printer(storno_ucet)
|
||
if self._receipt_has_fiscal_payment(storno_ucet):
|
||
try:
|
||
return self._print_fiscal_receipt(storno_ucet, title=title)
|
||
except Exception as e:
|
||
Logger.exception("CTRL: fiscal storno receipt print failed")
|
||
self._popup_info(
|
||
"Tlač storna",
|
||
"Fiskálny storno doklad sa nepodarilo vytlačiť.\n"
|
||
f"{e}",
|
||
)
|
||
return storno_ucet
|
||
self._enqueue_receipt_print_jobs(
|
||
storno_ucet,
|
||
kind=kind,
|
||
title=title,
|
||
required=True,
|
||
)
|
||
return storno_ucet
|
||
|
||
def _receipt_copies(self, ucet: data.Ucet) -> int:
|
||
copies = 1
|
||
for payment_item in getattr(ucet, "platby", []) or []:
|
||
try:
|
||
copies = max(copies, int(getattr(payment_item, "p_kopii", 1) or 1))
|
||
except Exception:
|
||
pass
|
||
return max(1, copies)
|
||
|
||
def _receipt_pos_name(self) -> str:
|
||
return str(getattr(self.setup, "pos_name", "") if self.setup else "")
|
||
|
||
def _allow_nonfiscal_receipt_without_printing(self) -> bool:
|
||
return self._setup_bool("payment_allow_without_bill_printing_", False)
|
||
|
||
def _kasutxt_value(self, key: str) -> str:
|
||
source = self.kasutxt
|
||
if not source:
|
||
return ""
|
||
if isinstance(source, dict):
|
||
return str(source.get(key) or "").strip()
|
||
return str(getattr(source, key, "") or "").strip()
|
||
|
||
def _receipt_header_footer_lines(self) -> tuple[list[str], list[str]]:
|
||
headers = [
|
||
self._kasutxt_value(f"userhead{idx}")
|
||
for idx in range(1, 10)
|
||
]
|
||
footers = [
|
||
self._kasutxt_value(f"usertail{idx}")
|
||
for idx in range(1, 7)
|
||
]
|
||
return (
|
||
[line for line in headers if line],
|
||
[line for line in footers if line],
|
||
)
|
||
|
||
def _enqueue_receipt_print_jobs(
|
||
self,
|
||
ucet: data.Ucet,
|
||
kind: str = "receipt",
|
||
title: str = "",
|
||
copies: int | None = None,
|
||
required: bool = False,
|
||
):
|
||
if not ucet:
|
||
return []
|
||
kind_l = str(kind or "receipt").strip().lower()
|
||
if self._receipt_has_fiscal_payment(ucet) and kind_l not in {"copy", "kopia", "reprint"}:
|
||
Logger.info("CTRL: fiscal receipt is not queued as text job kind=%s ucet=%s", kind_l, getattr(ucet, "ucislo", ""))
|
||
return []
|
||
printer_no = self._receipt_printer_no(ucet)
|
||
if not printer_no:
|
||
Logger.warning("CTRL: receipt printer is not set ucet=%s", getattr(ucet, "ucislo", ""))
|
||
return []
|
||
try:
|
||
headers, footers = self._receipt_header_footer_lines()
|
||
jobs = api_call.create_receipt_print_jobs_API(
|
||
self.ctx,
|
||
ucet,
|
||
kind=kind_l,
|
||
printer_no=printer_no,
|
||
title=title,
|
||
pos_name=self._receipt_pos_name(),
|
||
headers=headers,
|
||
footers=footers,
|
||
required=required,
|
||
priority=40,
|
||
copies=copies if copies is not None else self._receipt_copies(ucet),
|
||
)
|
||
Logger.info(
|
||
"CTRL: receipt print jobs created kind=%s ucet=%s count=%s",
|
||
kind_l,
|
||
getattr(ucet, "ucislo", ""),
|
||
len(jobs),
|
||
)
|
||
return jobs
|
||
except Exception as e:
|
||
Logger.exception("CTRL: receipt print job creation failed")
|
||
self._popup_info(
|
||
"Tlač účtu",
|
||
"Účet sa nepodarilo zaradiť do tlačovej fronty.\n"
|
||
f"{e}",
|
||
)
|
||
return []
|
||
|
||
def _show_nonfiscal_receipt_preview(
|
||
self,
|
||
ucet: data.Ucet,
|
||
kind: str = "receipt",
|
||
title: str = "Ucet",
|
||
):
|
||
if not ucet:
|
||
return
|
||
try:
|
||
printer_no = self._receipt_printer_no(ucet)
|
||
receipt_text = self._render_receipt_preview_text(
|
||
ucet,
|
||
kind=kind,
|
||
title=title,
|
||
printer_no=printer_no,
|
||
)
|
||
except Exception as e:
|
||
Logger.exception("CTRL: receipt preview render failed")
|
||
self._popup_info(
|
||
"Nahlad uctu",
|
||
"Nahlad uctu sa nepodarilo zobrazit.\n"
|
||
"Ucet je uzavrety bez automatickej tlace.\n"
|
||
f"{e}",
|
||
)
|
||
return
|
||
|
||
def print_from_preview():
|
||
return self._enqueue_receipt_print_jobs(
|
||
ucet,
|
||
kind=kind,
|
||
title=title,
|
||
required=False,
|
||
)
|
||
|
||
kivy_printer.show_receipt_preview(
|
||
ucet,
|
||
txt=title,
|
||
currencytxt=self._currency(),
|
||
kasutxt=self.kasutxt,
|
||
receipt_text=receipt_text,
|
||
on_print=print_from_preview,
|
||
print_label="TLAC",
|
||
)
|
||
|
||
def _handle_nonfiscal_receipt_after_close(
|
||
self,
|
||
ucet: data.Ucet,
|
||
kind: str = "receipt",
|
||
title: str = "Ucet",
|
||
):
|
||
if self._allow_nonfiscal_receipt_without_printing():
|
||
self._show_nonfiscal_receipt_preview(ucet, kind=kind, title=title)
|
||
return []
|
||
return self._enqueue_receipt_print_jobs(ucet, kind=kind, title=title)
|
||
|
||
def _print_fiscal_receipt(
|
||
self,
|
||
ucet: data.Ucet,
|
||
title: str = "Ucet",
|
||
) -> data.Ucet:
|
||
printer_no = self._receipt_printer_no(ucet)
|
||
if not printer_no:
|
||
raise RuntimeError("Nie je nastavena tlaciaren uctu.")
|
||
result = api_call.print_fiscal_receipt_API(
|
||
self.ctx,
|
||
ucet,
|
||
printer_no=printer_no,
|
||
title=title,
|
||
pos_name=self._receipt_pos_name(),
|
||
headers=[],
|
||
footers=[],
|
||
)
|
||
Logger.info(
|
||
"CTRL: fiscal receipt printed ucet=%s printer=%s result=%s",
|
||
getattr(ucet, "ucislo", ""),
|
||
printer_no,
|
||
getattr(result, "fiscal_result", {}),
|
||
)
|
||
return result.ucet
|
||
|
||
def _storno_saved_receipt_after_failed_fiscal(self, ucet: data.Ucet, error_text: str) -> data.Ucet | None:
|
||
if not ucet or not getattr(ucet, "ucislo", None):
|
||
return None
|
||
try:
|
||
resp_storno = self._create_and_save_storno(
|
||
ucet,
|
||
self.ctx,
|
||
print_docs=False,
|
||
origin="AutoStornoFiscal",
|
||
fiscal_result={
|
||
"status": "auto_storno_after_fiscal_failure",
|
||
"original_ucislo": getattr(ucet, "ucislo", ""),
|
||
"error": str(error_text or ""),
|
||
},
|
||
)
|
||
ucet_upd = ucet.model_copy(deep=True)
|
||
ucet_upd.storno = resp_storno.ucislo
|
||
ucet_upd.blocked_by = ""
|
||
ucet_upd.fiscal_result = {
|
||
"status": "failed",
|
||
"error": str(error_text or ""),
|
||
"storno_ucislo": resp_storno.ucislo,
|
||
}
|
||
api_call.save_ucet_API(self.ctx, ucet_upd)
|
||
Logger.info(
|
||
"CTRL: fiscal failure storno created ucet=%s storno=%s",
|
||
getattr(ucet, "ucislo", ""),
|
||
getattr(resp_storno, "ucislo", ""),
|
||
)
|
||
return resp_storno
|
||
except Exception:
|
||
Logger.exception("CTRL: fiscal failure storno creation failed")
|
||
return None
|
||
|
||
def _render_receipt_preview_text(
|
||
self,
|
||
ucet: data.Ucet,
|
||
kind: str = "receipt",
|
||
title: str = "",
|
||
printer_no: str = "",
|
||
) -> str:
|
||
if not ucet:
|
||
return ""
|
||
payload = ucet.model_copy(deep=True)
|
||
if printer_no:
|
||
payload.bill_printer = printer_no
|
||
headers, footers = self._receipt_header_footer_lines()
|
||
preview = api_call.render_receipt_preview_API(
|
||
self.ctx,
|
||
payload,
|
||
kind=kind,
|
||
printer_no=printer_no or self._receipt_printer_no(payload),
|
||
title=title,
|
||
pos_name=self._receipt_pos_name(),
|
||
headers=headers,
|
||
footers=footers,
|
||
)
|
||
return preview.text
|
||
|
||
def _receipt_copy_via(self, ucet: data.Ucet) -> str:
|
||
param_name = (
|
||
"reprint_past_closure_bill_via"
|
||
if getattr(ucet, "c_uzaverka", None)
|
||
else "reprint_current_closure_bill_via"
|
||
)
|
||
return str(getattr(self.setup, param_name, "kpk") if self.setup else "kpk").strip().lower()
|
||
|
||
def _receipt_fiscal_bill_id(self, ucet: data.Ucet) -> str:
|
||
fiscal_result = getattr(ucet, "fiscal_result", {}) or {}
|
||
if not isinstance(fiscal_result, dict):
|
||
return ""
|
||
candidates = [
|
||
fiscal_result.get("bill_id"),
|
||
fiscal_result.get("BILL_ID"),
|
||
]
|
||
ret = fiscal_result.get("return")
|
||
if isinstance(ret, dict):
|
||
candidates.extend([ret.get("bill_id"), ret.get("BILL_ID")])
|
||
response = fiscal_result.get("response")
|
||
if isinstance(response, dict):
|
||
response_ret = response.get("return")
|
||
if isinstance(response_ret, dict):
|
||
candidates.extend([response_ret.get("bill_id"), response_ret.get("BILL_ID")])
|
||
else:
|
||
candidates.append(response_ret)
|
||
for candidate in candidates:
|
||
value = str(candidate or "").strip()
|
||
if value:
|
||
return value
|
||
return ""
|
||
|
||
def _print_receipt_copy(self, ucet: data.Ucet):
|
||
via = self._receipt_copy_via(ucet)
|
||
if via in {"kpk", "text", "foodie"} or not self._receipt_has_fiscal_payment(ucet):
|
||
return self._enqueue_receipt_print_jobs(ucet, kind="copy", title="Kopie", copies=1)
|
||
if via not in {"fiskal", "fiscal"}:
|
||
raise RuntimeError(f"Neznamy sposob tlace kopie uctu: {via}")
|
||
bill_id = self._receipt_fiscal_bill_id(ucet)
|
||
if not bill_id:
|
||
raise RuntimeError(
|
||
"Ucet nema ulozeny bill_id z fiskalneho servera. "
|
||
"Fiskalnu kopiu nie je mozne vytlacit; pouzi textovu kopiu cez kpk."
|
||
)
|
||
result = api_call.print_fiscal_receipt_copy_API(
|
||
self.ctx,
|
||
ucet,
|
||
printer_no=self._receipt_printer_no(ucet),
|
||
bill_id=bill_id,
|
||
)
|
||
Logger.info(
|
||
"CTRL: fiscal receipt copy printed ucet=%s bill_id=%s result=%s",
|
||
getattr(ucet, "ucislo", ""),
|
||
bill_id,
|
||
getattr(result, "fiscal_result", {}),
|
||
)
|
||
return result
|
||
|
||
def _set_partial_storno_payments(self, storno: data.Ucet, original: data.Ucet):
|
||
target_abs = abs(float(storno.total_base_currency or storno.total_czk() or 0))
|
||
payments = list(getattr(original, "platby", []) or [])
|
||
total_paid = sum(abs(float(getattr(p, "suma_czk", 0) or 0)) for p in payments)
|
||
storno.platby = []
|
||
if target_abs <= 0 or total_paid <= 0:
|
||
return
|
||
original_total = abs(float(getattr(original, "total_base_currency", 0) or original.total_czk() or 0))
|
||
ratio_total = (target_abs / original_total) if original_total else 0.0
|
||
storno.round50 = round(-float(getattr(original, "round50", 0) or 0) * ratio_total, 2)
|
||
target_payment_abs = round(target_abs - storno.round50, 2)
|
||
remaining = round(target_payment_abs, 2)
|
||
allocations: dict[int, tuple[float, float]] = {}
|
||
for idx, pay in enumerate(payments):
|
||
if idx == len(payments) - 1:
|
||
amount_czk = remaining
|
||
else:
|
||
amount_czk = round(target_payment_abs * (abs(float(pay.suma_czk or 0)) / total_paid), 2)
|
||
remaining = round(remaining - amount_czk, 2)
|
||
source_amount = abs(float(getattr(pay, "suma_czk", 0) or getattr(pay, "suma", 0) or 0))
|
||
ratio = (abs(float(amount_czk or 0)) / source_amount) if source_amount else ratio_total
|
||
tip = round(abs(float(getattr(pay, "tip", 0) or 0)) * ratio, 2)
|
||
allocations[idx] = (amount_czk, tip)
|
||
storno.platby = self._build_storno_payments(
|
||
original,
|
||
allocations=allocations,
|
||
origin=getattr(storno, "origin", "") or "Storno_polozek",
|
||
)
|
||
|
||
def load_stoly(self, closed: bool = False):
|
||
Logger.info("CTRL: load_stoly")
|
||
return api_call.load_stoly_API(self.ctx, closed=closed)
|
||
# ------------------------------------------------------------------------
|
||
def load_closed_ucty(self, limit=150, onlynonclsrep=True):
|
||
Logger.info("CTRL: load_closed_ucty")
|
||
ucty = api_call.load_stoly_API(
|
||
self.ctx,
|
||
closed=True,
|
||
onlynonclsrep=onlynonclsrep,
|
||
limit=limit,
|
||
) or []
|
||
ucty.sort(key=lambda u: u.closed_at or "", reverse=True)
|
||
return ucty[:limit]
|
||
# ------------------------------------------------------------------------
|
||
def _after_storno_dialog(self, dialog):
|
||
if not hasattr(dialog, "storno_result"):
|
||
return
|
||
if dialog.storno_result is None:
|
||
return
|
||
u_main, u_sec = dialog.storno_result
|
||
if u_sec:
|
||
self._enqueue_kitchen_print_jobs(u_sec, kind="storno")
|
||
self.update_ucet(u_main)
|
||
# tady už je dialog pryč
|
||
self.goto_account_select()
|
||
# ------------------------------------------------------------------------
|
||
def has_main(u_main_edit):
|
||
return bool(u_main_edit and u_main_edit.poloz)
|
||
# ------------------------------------------------------------------------
|
||
def _collect_kitchen_items(self, u_main_edit):
|
||
if not u_main_edit or not u_main_edit.poloz:
|
||
return None
|
||
items = [p for p in u_main_edit.poloz if not p.sent]
|
||
if not items:
|
||
return None
|
||
return data.UcetEdit(
|
||
**u_main_edit.model_dump(exclude={"poloz"}),
|
||
poloz=[p.model_copy(deep=True) for p in items],
|
||
)
|
||
# ------------------------------------------------------------------------
|
||
def has_server_items(u: data.UcetEdit) -> bool:
|
||
return any(p.sent for p in u.poloz)
|
||
# ------------------------------------------------------------------------
|
||
def _exists_on_server(self, stul): #testuje, existuje-li ucet na serveru
|
||
self.res = api_call.load_ucet_API( self.ctx, stul=stul)
|
||
return self.res.poloz
|
||
# ------------------------------------------------------------------------
|
||
def _handle_split_result(self, u_main_edit, u_sec_edit):
|
||
Logger.info("CTRL: split result received")
|
||
self._split_u_main = u_main_edit
|
||
self._split_u_sec = u_sec_edit
|
||
# nic nebylo vybráno → jen unblock main
|
||
if not u_sec_edit or not u_sec_edit.poloz:
|
||
Logger.info("CTRL: split empty → unblock main only")
|
||
#api_call.unblock_ucet_API(self.ctx, u_main_edit.stul)
|
||
self._end_split()
|
||
return
|
||
# otevřeme AccountSelect pro výběr cílového stolu
|
||
screen = accountselect.AccountSelectScreen(
|
||
name="account_split",
|
||
controller= self,
|
||
get_ucty=lambda: self.load_stoly(closed=False),
|
||
on_select=self._on_split_target_selected,
|
||
on_cancel=self._on_split_cancelled,
|
||
on_logout=self.logout_user,
|
||
on_info=None,
|
||
mode="split",
|
||
only_opened=True,
|
||
allow_manual=True,
|
||
)
|
||
self.app.sm.add_widget(screen)
|
||
self.app.sm.current = "account_split"
|
||
# ------------------------------------------------------------------------
|
||
def _on_info(self):
|
||
try:
|
||
SystemStatusDialog(self).open()
|
||
except Exception as e:
|
||
Logger.exception("CTRL: system status dialog failed")
|
||
_popup_info("Setting", read_start_param(
|
||
self.user_login.name if self.user_login else "",
|
||
self.app.cfg,
|
||
self.version_API,
|
||
self.database_name,
|
||
self.version_frontend,
|
||
) + f"\n\nChyba servisného panelu:\n{e}", monospace=True)
|
||
# ------------------------------------------------------------------------
|
||
def handle_usage_report(self):
|
||
try:
|
||
UsageReportDialog(self).open()
|
||
except Exception as e:
|
||
Logger.exception("CTRL: usage report dialog failed")
|
||
self._popup_info("Prezeranie spotreby", f"Okno spotreby sa nepodarilo otvoriť.\n{e}")
|
||
# ------------------------------------------------------------------------
|
||
def _on_split_cancelled(self):
|
||
Logger.info("CTRL: split cancelled")
|
||
self._end_split(return_to_source=True)
|
||
# ------------------------------------------------------------------------
|
||
def _on_split_target_selected(self, target_stul: str):
|
||
Logger.info(f"CTRL: split target selected stul={target_stul}")
|
||
self._split_target_stul = target_stul
|
||
self._split_u_sec.stul = target_stul
|
||
if self._is_limit_table_id(target_stul):
|
||
self._split_target_is_limit = True
|
||
self._finalize_split_to_limit_target(target_stul)
|
||
return
|
||
self._try_block_split_target(target_stul)
|
||
# ------------------------------------------------------------------------
|
||
def _course_id_to_limit_rov(self, course) -> int:
|
||
value = course.get("id_rov") if isinstance(course, dict) else getattr(course, "id_rov", None)
|
||
try:
|
||
if value:
|
||
return int(value)
|
||
except Exception:
|
||
pass
|
||
cid = str((course.get("id") if isinstance(course, dict) else getattr(course, "id", "")) or "")
|
||
if cid.startswith("rov:"):
|
||
try:
|
||
return int(cid.split(":", 1)[1])
|
||
except Exception:
|
||
return 0
|
||
return 0
|
||
|
||
def _guest_id_to_limit_hlad(self, guest) -> int:
|
||
value = guest.get("c_hlad") if isinstance(guest, dict) else getattr(guest, "c_hlad", None)
|
||
try:
|
||
if value:
|
||
return int(value)
|
||
except Exception:
|
||
pass
|
||
gid = str((guest.get("id") if isinstance(guest, dict) else getattr(guest, "id", "")) or "")
|
||
if gid.startswith("hlad:"):
|
||
try:
|
||
return int(gid.split(":", 1)[1])
|
||
except Exception:
|
||
return 0
|
||
return 0
|
||
|
||
def _ordered_split_ids(self, incoming, attr_name: str):
|
||
ids = []
|
||
seen = set()
|
||
for pol in getattr(incoming, "poloz", []) or []:
|
||
value = str(getattr(pol, attr_name, "") or "")
|
||
if value in seen:
|
||
continue
|
||
seen.add(value)
|
||
ids.append(value)
|
||
return ids
|
||
|
||
def _ask_limit_target_mapping(self, target_limit, incoming, on_done, on_cancel):
|
||
modal = BaseModal(size_hint=(None, None), size=(dp(700), dp(560)))
|
||
self._limit_target_modal = modal
|
||
popup = LimitTargetMappingPopup(
|
||
source_course_ids=self._ordered_split_ids(incoming, "course_id"),
|
||
source_guest_ids=self._ordered_split_ids(incoming, "guest_id"),
|
||
source_courses=getattr(incoming, "courses", []) or [],
|
||
source_guests=getattr(incoming, "guests", []) or [],
|
||
target_courses=getattr(target_limit, "courses", []) or [],
|
||
target_guests=getattr(target_limit, "guests", []) or [],
|
||
on_done=lambda result: self._finish_limit_target_mapping(result, on_done),
|
||
on_cancel=lambda: self._cancel_limit_target_mapping(on_cancel),
|
||
labels={
|
||
"title": self.tr("limit.transfer_mapping", "Priradenie na limitovy stol"),
|
||
"courses": self.tr("limit.courses", "Chody"),
|
||
"guests": self.tr("limit.guests", "Hostia / hladiny"),
|
||
"cancel": self.tr("button.cancel", "Zrusit"),
|
||
"ok": self.tr("button.ok", "OK"),
|
||
},
|
||
)
|
||
modal.add_widget(popup)
|
||
modal.open()
|
||
|
||
def _finish_limit_target_mapping(self, result, on_done):
|
||
if hasattr(self, "_limit_target_modal"):
|
||
self._limit_target_modal.dismiss()
|
||
del self._limit_target_modal
|
||
on_done(result)
|
||
|
||
def _cancel_limit_target_mapping(self, on_cancel):
|
||
if hasattr(self, "_limit_target_modal"):
|
||
self._limit_target_modal.dismiss()
|
||
del self._limit_target_modal
|
||
on_cancel()
|
||
|
||
def _prepare_items_for_limit_target(self, incoming, target_limit, mapping=None):
|
||
courses = list(getattr(target_limit, "courses", []) or [])
|
||
guests = list(getattr(target_limit, "guests", []) or [])
|
||
first_course = courses[0] if courses else {"id": "rov:0", "name": "", "id_rov": 0}
|
||
first_guest = guests[0] if guests else {"id": "hlad:0", "name": "", "c_hlad": 0}
|
||
course_by_id = {str(c.get("id", "") or ""): c for c in courses if isinstance(c, dict)}
|
||
guest_by_id = {str(g.get("id", "") or ""): g for g in guests if isinstance(g, dict)}
|
||
course_by_name = {str(c.get("name", "")).strip(): c for c in courses if isinstance(c, dict)}
|
||
guest_by_name = {str(g.get("name", "")).strip(): g for g in guests if isinstance(g, dict)}
|
||
source_courses = {
|
||
str(c.get("id", "")): str(c.get("name", "")).strip()
|
||
for c in (getattr(incoming, "courses", []) or [])
|
||
if isinstance(c, dict)
|
||
}
|
||
source_guests = {
|
||
str(g.get("id", "")): str(g.get("name", "")).strip()
|
||
for g in (getattr(incoming, "guests", []) or [])
|
||
if isinstance(g, dict)
|
||
}
|
||
course_map = (mapping or {}).get("course_map", {}) or {}
|
||
guest_map = (mapping or {}).get("guest_map", {}) or {}
|
||
prepared = []
|
||
for pol in getattr(incoming, "poloz", []) or []:
|
||
item = pol.model_copy(deep=True)
|
||
source_course_id = str(getattr(item, "course_id", "") or "")
|
||
source_guest_id = str(getattr(item, "guest_id", "") or "")
|
||
course = (
|
||
course_by_id.get(str(course_map.get(source_course_id, "") or ""))
|
||
or course_by_name.get(source_courses.get(source_course_id, ""))
|
||
or first_course
|
||
)
|
||
guest = (
|
||
guest_by_id.get(str(guest_map.get(source_guest_id, "") or ""))
|
||
or guest_by_name.get(source_guests.get(source_guest_id, ""))
|
||
or first_guest
|
||
)
|
||
id_rov = self._course_id_to_limit_rov(course)
|
||
c_hlad = self._guest_id_to_limit_hlad(guest)
|
||
item.course_id = course.get("id", f"rov:{id_rov}") if isinstance(course, dict) else f"rov:{id_rov}"
|
||
item.guest_id = guest.get("id", f"hlad:{c_hlad}") if isinstance(guest, dict) else f"hlad:{c_hlad}"
|
||
item.limit_rov_id = id_rov
|
||
item.limit_hlad_id = c_hlad
|
||
item.limit_item_id = None
|
||
item.limit_fmenu_id = ""
|
||
prepared.append(item)
|
||
return prepared
|
||
|
||
def _clear_limit_metadata_for_normal(self, ucet):
|
||
if not ucet:
|
||
return ucet
|
||
ucet.limit_mode = False
|
||
ucet.limit_id = None
|
||
ucet.limit_den_id = None
|
||
ucet.limit_rov_ids = []
|
||
ucet.limit_cenhlad = ""
|
||
for pol in getattr(ucet, "poloz", []) or []:
|
||
pol.limit_item_id = None
|
||
pol.limit_rov_id = None
|
||
pol.limit_hlad_id = None
|
||
pol.limit_fmenu_id = ""
|
||
return ucet
|
||
|
||
def _save_limit_split_source(self):
|
||
if not self._split_source_is_limit or not self._split_u_main:
|
||
return
|
||
limit_source = self._combine_limit_ucet_parts(self._split_u_main, None)
|
||
if limit_source is not None:
|
||
api_call.save_limit_ucet_API(self.ctx, limit_source)
|
||
|
||
def _finalize_split_to_limit_target(self, target_stul: str):
|
||
Logger.info(f"CTRL: finalize split to limit target stul={target_stul}")
|
||
id_limit = None
|
||
target_loaded = False
|
||
try:
|
||
id_limit, id_den = self._parse_limit_table_id(target_stul)
|
||
target_limit = api_call.load_limit_ucet_API(self.ctx, id_limit=id_limit, id_den=id_den)
|
||
target_loaded = True
|
||
incoming = self._split_u_sec
|
||
if not incoming or not incoming.poloz:
|
||
api_call.release_limit_API(self.ctx, id_limit=id_limit)
|
||
self._end_split()
|
||
return
|
||
self._ask_limit_target_mapping(
|
||
target_limit,
|
||
incoming,
|
||
on_done=lambda mapping: self._execute_split_to_limit_target(
|
||
target_stul,
|
||
target_limit,
|
||
incoming,
|
||
mapping,
|
||
),
|
||
on_cancel=lambda: self._cancel_split_to_limit_target(target_stul),
|
||
)
|
||
except Exception as e:
|
||
Logger.exception("CTRL: split to limit failed")
|
||
_popup_info("Prevod na limit", f"Polozky sa nepodarilo presunut na limitovy stol.\n{e}")
|
||
if id_limit and target_loaded:
|
||
try:
|
||
api_call.release_limit_API(self.ctx, id_limit=id_limit)
|
||
except Exception:
|
||
Logger.warning("CTRL: target limit release after error failed")
|
||
self._end_split(return_to_source=True)
|
||
|
||
def _cancel_split_to_limit_target(self, target_stul: str):
|
||
try:
|
||
id_limit, _ = self._parse_limit_table_id(target_stul)
|
||
api_call.release_limit_API(self.ctx, id_limit=id_limit)
|
||
except Exception:
|
||
Logger.warning("CTRL: target limit release after cancel failed")
|
||
self._end_split(return_to_source=True)
|
||
|
||
def _execute_split_to_limit_target(self, target_stul: str, target_limit, incoming, mapping):
|
||
try:
|
||
target_limit.poloz.extend(self._prepare_items_for_limit_target(incoming, target_limit, mapping))
|
||
self._prepare_limit_ucet(target_limit)
|
||
api_call.save_limit_ucet_API(self.ctx, target_limit)
|
||
if self._split_source_is_limit:
|
||
self._save_limit_split_source()
|
||
elif self._split_u_main and self._split_u_main.poloz:
|
||
api_call.save_ucet_API(self.ctx, data.ucet_edit_to_ucet(self._split_u_main))
|
||
elif self._split_u_main and self._split_u_main.stul:
|
||
api_call.delete_ucet_API(self.ctx, stul=self._split_u_main.stul)
|
||
self._end_split()
|
||
except Exception as e:
|
||
Logger.exception("CTRL: split to limit execute failed")
|
||
_popup_info("Prevod na limit", f"Polozky sa nepodarilo presunut na limitovy stol.\n{e}")
|
||
self._end_split(return_to_source=True)
|
||
finally:
|
||
try:
|
||
id_limit, _ = self._parse_limit_table_id(target_stul)
|
||
api_call.release_limit_API(self.ctx, id_limit=id_limit)
|
||
except Exception:
|
||
Logger.warning("CTRL: target limit release failed")
|
||
|
||
# ------------------------------------------------------------------------
|
||
def _try_block_split_target(self, stul: str):
|
||
Logger.info(f"CTRL: try block split target stul={stul}")
|
||
try:
|
||
api_call.block_ucet_API(self.ctx, stul=stul)
|
||
Logger.info("CTRL: target ucet exists and blocked")
|
||
self._finalize_split(blocked_target=True)
|
||
return
|
||
except RuntimeError as e:
|
||
err = e.args[0]
|
||
if isinstance(err, dict) and "detail" in err:
|
||
detail = err["detail"].lower()
|
||
if "nenalezen" in detail or "neexistuje" in detail:
|
||
Logger.info("CTRL: target ucet does not exist")
|
||
self._finalize_split(blocked_target=False)
|
||
return
|
||
Logger.warning("CTRL: target ucet blocked by other terminal")
|
||
# FALLBACK – KDYBY API MLČELO
|
||
Logger.warning("CTRL: block_ucet returned no clear result, treating as NOT_FOUND")
|
||
self._finalize_split(blocked_target=False)
|
||
# ------------------------------------------------------------------------
|
||
def _reopen_split_target_select(self):
|
||
Logger.info("CTRL: reopen split target select")
|
||
screen = accountselect.AccountSelectScreen(
|
||
name="account_split",
|
||
controller=self,
|
||
get_ucty=lambda: self.load_stoly(closed=False),
|
||
on_select=self._on_split_target_selected,
|
||
on_cancel=self._on_split_cancelled,
|
||
on_logout=self.logout_user,
|
||
on_info=None,
|
||
mode="split",
|
||
only_opened=True,
|
||
allow_manual=True,
|
||
)
|
||
sm = self.app.sm
|
||
# odstranit starý split screen, pokud existuje
|
||
if sm.has_screen("account_split"):
|
||
sm.remove_widget(sm.get_screen("account_split"))
|
||
|
||
sm.add_widget(screen)
|
||
sm.current = "account_split"
|
||
# ------------------------------------------------------------------------
|
||
def _apply_guest_mapping(self, items, target_guest_id):
|
||
for p in items:
|
||
p.guest_id = target_guest_id
|
||
|
||
def _finalize_split(self, blocked_target: bool):
|
||
Logger.info(f"CTRL: finalize split blocked_target={blocked_target}")
|
||
|
||
u_main = self._split_u_main
|
||
u_sec = self._split_u_sec
|
||
|
||
# 🔹 guests cleanup
|
||
guest_ids = {p.guest_id for p in u_sec.poloz}
|
||
u_sec.guests = [g for g in u_sec.guests if g["id"] in guest_ids]
|
||
|
||
# 🔹 courses cleanup
|
||
course_ids = {p.course_id for p in u_sec.poloz}
|
||
u_sec.courses = [c for c in u_sec.courses if c["id"] in course_ids]
|
||
|
||
has_main = bool(u_main and u_main.poloz)
|
||
has_sec = bool(u_sec and u_sec.poloz)
|
||
|
||
# 🔴 nič sa neprenáša
|
||
if not has_sec:
|
||
Logger.info("CTRL: split empty → end split")
|
||
self._end_split()
|
||
return
|
||
|
||
# 🔴 target NEEXISTUJE → create + save
|
||
if not blocked_target:
|
||
Logger.info("CTRL: target does not exist → open+block target")
|
||
|
||
try:
|
||
api_call.open_block_ucet_API(
|
||
self.ctx,
|
||
self._split_target_stul
|
||
)
|
||
except Exception as e:
|
||
err = e.args[0] if e.args else str(e)
|
||
|
||
if isinstance(err, dict) and "detail" in err:
|
||
if "blokován" in err["detail"]:
|
||
_popup_info(
|
||
"Nelze převést položky",
|
||
f"Cílový stůl {self._split_target_stul} je otevřen jinde."
|
||
)
|
||
self._reopen_split_target_select()
|
||
return
|
||
|
||
Logger.exception(e)
|
||
_popup_info("Chyba", "Nelze vytvořit cílový účet.")
|
||
self._end_split()
|
||
return
|
||
|
||
Logger.info("CTRL: saving split into new target")
|
||
|
||
if self._split_source_is_limit:
|
||
u_sec_db = data.ucet_edit_to_ucet(u_sec)
|
||
self._clear_limit_metadata_for_normal(u_sec_db)
|
||
api_call.save_ucet_API(self.ctx, u_sec_db)
|
||
self._save_limit_split_source()
|
||
self._end_split()
|
||
return
|
||
|
||
api_call.save_ucet_API(
|
||
self.ctx,
|
||
data.ucet_edit_to_ucet(u_sec)
|
||
)
|
||
|
||
if has_main:
|
||
api_call.save_ucet_API(
|
||
self.ctx,
|
||
data.ucet_edit_to_ucet(u_main)
|
||
)
|
||
else:
|
||
api_call.delete_ucet_API(self.ctx, stul=u_main.stul)
|
||
|
||
self._end_split()
|
||
return
|
||
|
||
# 🔵 TARGET EXISTUJE → MERGE FLOW
|
||
Logger.info("CTRL: merging into existing target")
|
||
|
||
target_ucet = api_call.load_ucet_API(
|
||
self.ctx,
|
||
self._split_target_stul
|
||
)
|
||
|
||
self._start_merge_with_mapping(
|
||
target_ucet,
|
||
u_sec,
|
||
u_main,
|
||
has_main
|
||
)
|
||
|
||
def _start_merge_with_mapping(self, target_ucet, u_sec, u_main, has_main):
|
||
source_guest_ids = {p.guest_id for p in u_sec.poloz}
|
||
|
||
def on_mapping_done(result):
|
||
self._execute_merge_with_mapping(
|
||
result,
|
||
target_ucet,
|
||
u_sec,
|
||
u_main,
|
||
has_main
|
||
)
|
||
|
||
# 🔹 jeden hosť → jednoduchý výber
|
||
if self.mamehosti:
|
||
if len(source_guest_ids) == 1:
|
||
src_id = next(iter(source_guest_ids))
|
||
|
||
def handle_single(tgt_id):
|
||
result = {
|
||
"guest_map": {src_id: tgt_id},
|
||
"guests": target_ucet.guests[:]
|
||
}
|
||
on_mapping_done(result)
|
||
|
||
self._ask_guest_mapping(target_ucet, handle_single)
|
||
|
||
# 🔹 viac hostí → mapping popup
|
||
else:
|
||
self._ask_guest_mapping_multi(
|
||
target_ucet,
|
||
source_guest_ids,
|
||
on_mapping_done,
|
||
source_guests=u_sec.guests,
|
||
)
|
||
else:
|
||
if len(source_guest_ids) == 1:
|
||
src_id = next(iter(source_guest_ids))
|
||
result = {
|
||
"guest_map": {src_id: "g1"},
|
||
"guests": target_ucet.guests[:]
|
||
}
|
||
on_mapping_done(result)
|
||
# 🔹 viac hostí → mapping popup
|
||
else:
|
||
guest_map = {}
|
||
for src_id in source_guest_ids:
|
||
guest_map[src_id] = "g1"
|
||
on_mapping_done({
|
||
"guest_map": guest_map,
|
||
"guests": target_ucet.guests[:]
|
||
})
|
||
|
||
|
||
def _execute_merge_with_mapping(
|
||
self,
|
||
result,
|
||
target_ucet,
|
||
u_sec,
|
||
u_main,
|
||
has_main
|
||
):
|
||
guest_map = result["guest_map"]
|
||
updated_guests = result["guests"]
|
||
|
||
# 🔹 sync guests
|
||
target_ucet.guests = updated_guests
|
||
|
||
# 🔹 guest mapping
|
||
for p in u_sec.poloz:
|
||
p.guest_id = guest_map.get(p.guest_id, p.guest_id)
|
||
|
||
# 🔹 course mapping
|
||
course_map = self._map_courses_by_name(target_ucet, u_sec)
|
||
|
||
for p in u_sec.poloz:
|
||
p.course_id = course_map.get(p.course_id, p.course_id)
|
||
|
||
# 🔹 meta sync
|
||
u_sec.courses = target_ucet.courses
|
||
u_sec.guests = target_ucet.guests
|
||
|
||
# 🔹 API
|
||
if self._split_source_is_limit:
|
||
pass
|
||
elif has_main:
|
||
api_call.save_ucet_API(
|
||
self.ctx,
|
||
data.ucet_edit_to_ucet(u_main)
|
||
)
|
||
else:
|
||
api_call.delete_ucet_API(
|
||
self.ctx,
|
||
stul=u_main.stul
|
||
)
|
||
|
||
u_sec_db = data.ucet_edit_to_ucet(u_sec)
|
||
if self._split_source_is_limit:
|
||
self._clear_limit_metadata_for_normal(u_sec_db)
|
||
|
||
api_call.merge_ucet_API(
|
||
self.ctx,
|
||
u_sec_db,
|
||
target_stul=self._split_target_stul,
|
||
)
|
||
if self._split_source_is_limit:
|
||
self._save_limit_split_source()
|
||
|
||
self._end_split()
|
||
|
||
def _map_courses_by_name(self, target_ucet, source_ucet):
|
||
target_by_name = {c["name"]: c["id"] for c in target_ucet.courses}
|
||
course_map = {}
|
||
|
||
for c in source_ucet.courses:
|
||
if c["name"] in target_by_name:
|
||
course_map[c["id"]] = target_by_name[c["name"]]
|
||
else:
|
||
target_ucet.courses.append(c)
|
||
course_map[c["id"]] = c["id"]
|
||
|
||
return course_map
|
||
|
||
def _ask_guest_mapping(self, target_ucet, on_done):
|
||
root = BoxLayout(orientation="vertical", spacing=dp(6), padding=dp(10))
|
||
modal = BaseModal(size_hint=(None, None), size=(dp(300), dp(400)))
|
||
self._guest_modal = modal
|
||
|
||
def choose(gid):
|
||
modal.dismiss()
|
||
if hasattr(self, "_guest_modal"):
|
||
del self._guest_modal
|
||
on_done(gid)
|
||
|
||
def cancel():
|
||
modal.dismiss()
|
||
if hasattr(self, "_guest_modal"):
|
||
del self._guest_modal
|
||
self._end_split(return_to_source=True)
|
||
|
||
for g in target_ucet.guests or []:
|
||
btn = Button(text=g["name"])
|
||
btn.bind(on_press=lambda _, gid=g["id"]: choose(gid))
|
||
root.add_widget(btn)
|
||
|
||
btn_new = Button(
|
||
text="+ Nový hosť",
|
||
background_color=(0.2, 0.6, 0.2, 1),
|
||
)
|
||
btn_new.bind(on_press=lambda *_: self._create_new_guest(target_ucet, choose, modal))
|
||
root.add_widget(btn_new)
|
||
btn_cancel = Button(
|
||
text="Zrušiť",
|
||
background_color=(0.6, 0.2, 0.2, 1),
|
||
)
|
||
btn_cancel.bind(on_press=lambda *_: cancel())
|
||
root.add_widget(btn_cancel)
|
||
|
||
modal.add_widget(root)
|
||
modal.open()
|
||
|
||
|
||
def _ask_guest_mapping_multi(
|
||
self,
|
||
target_ucet,
|
||
source_guest_ids,
|
||
on_done,
|
||
source_guests=None,
|
||
):
|
||
content = GuestMappingPopup(
|
||
target_ucet=target_ucet,
|
||
source_guest_ids=source_guest_ids,
|
||
on_done=lambda mapping: self._close_guest_modal(mapping, on_done),
|
||
source_guests=source_guests,
|
||
on_cancel=self._cancel_guest_mapping,
|
||
request_guest_name=self._ask_new_guest_name,
|
||
)
|
||
|
||
modal = BaseModal(size_hint=(None, None), size=(dp(400), dp(400)))
|
||
modal.add_widget(content)
|
||
modal.open()
|
||
|
||
self._guest_modal = modal
|
||
|
||
def _close_guest_modal(self, mapping, on_done):
|
||
if hasattr(self, "_guest_modal"):
|
||
self._guest_modal.dismiss()
|
||
del self._guest_modal
|
||
on_done(mapping)
|
||
|
||
def _cancel_guest_mapping(self):
|
||
if hasattr(self, "_guest_modal"):
|
||
self._guest_modal.dismiss()
|
||
del self._guest_modal
|
||
self._end_split(return_to_source=True)
|
||
|
||
|
||
def _new_guest_id(self, target_ucet):
|
||
existing = {g["id"] for g in (target_ucet.guests or [])}
|
||
i = 1
|
||
while True:
|
||
gid = f"g{i}"
|
||
if gid not in existing:
|
||
return gid
|
||
i += 1
|
||
|
||
|
||
def _create_new_guest(self, target_ucet, choose, modal):
|
||
def done(name):
|
||
if not name:
|
||
return
|
||
|
||
new_id = self._new_guest_id(target_ucet)
|
||
|
||
if not target_ucet.guests:
|
||
target_ucet.guests = []
|
||
target_ucet.guests.append({
|
||
"id": new_id,
|
||
"name": name
|
||
})
|
||
|
||
modal.dismiss()
|
||
choose(new_id)
|
||
|
||
self._ask_new_guest_name(done)
|
||
|
||
def _ask_new_guest_name(self, on_done):
|
||
keyboard = PosKeyboard(
|
||
on_key=self.dispatch_key,
|
||
bezokesc=True,
|
||
)
|
||
modal = TextMessageDialog(
|
||
title="Meno hosťa",
|
||
on_done=on_done,
|
||
modal_manager=self.modal_manager,
|
||
keyboard=keyboard,
|
||
)
|
||
self.modal_manager.open(modal)
|
||
|
||
# ------------------------------------------------------------------------
|
||
def _end_split(self, return_to_source: bool = False):
|
||
Logger.info("CTRL: split finished")
|
||
source_stul = None
|
||
if return_to_source and self._split_u_main and self._split_u_main.stul:
|
||
source_stul = self._split_u_main.stul
|
||
self._restore_split_source_on_cancel()
|
||
# --- UNBLOCK MAIN ---
|
||
if hasattr(self, "_guest_modal"):
|
||
self._guest_modal.dismiss()
|
||
del self._guest_modal
|
||
if hasattr(self, "_limit_target_modal"):
|
||
self._limit_target_modal.dismiss()
|
||
del self._limit_target_modal
|
||
if self._split_source_is_limit:
|
||
self._release_current_limit()
|
||
elif self._split_u_main and self._split_u_main.stul:
|
||
try:
|
||
api_call.unblock_ucet_API(self.ctx, self._split_u_main.stul)
|
||
except Exception as e:
|
||
Logger.warning(f"UNBLOCK main failed: {e}")
|
||
# --- UNBLOCK TARGET ---
|
||
if self._split_target_stul and not self._is_limit_table_id(self._split_target_stul):
|
||
try:
|
||
api_call.unblock_ucet_API(self.ctx, self._split_target_stul)
|
||
except Exception as e:
|
||
Logger.warning(f"UNBLOCK target failed: {e}")
|
||
# vyčistit stav splitu
|
||
self._split_u_main = None
|
||
self._split_u_sec = None
|
||
# zavřít split screen, pokud existuje
|
||
sm = self.app.sm
|
||
if sm.has_screen("account_split"):
|
||
sm.remove_widget(sm.get_screen("account_split"))
|
||
# návrat na přehled účtů
|
||
self.app.sm.current = SCREEN_ACCOUNT
|
||
self._split_target_stul = None
|
||
self._split_source_is_limit = False
|
||
self._split_target_is_limit = False
|
||
self._editing_stul = None
|
||
self._opened_dummy = False
|
||
if source_stul:
|
||
Clock.schedule_once(
|
||
lambda *_: self._return_to_split_source(source_stul),
|
||
0,
|
||
)
|
||
|
||
def _return_to_split_source(self, stul):
|
||
Logger.info(f"CTRL: return to split source stul={stul}")
|
||
if self._is_limit_table_id(stul):
|
||
self.open_limit_table(stul)
|
||
return
|
||
self.open_posdialog(data.Ucet(stul=stul))
|
||
|
||
def _restore_split_source_on_cancel(self):
|
||
if not self._split_u_main:
|
||
return
|
||
if self._split_source_is_limit:
|
||
return
|
||
try:
|
||
restored = self._split_u_main.model_copy(deep=True)
|
||
restored.poloz = self._build_cancelled_split_items()
|
||
restored.stul = self._split_u_main.stul
|
||
restored.closed_at = None
|
||
restored.ucislo = None
|
||
api_call.save_ucet_API(
|
||
self.ctx,
|
||
data.ucet_edit_to_ucet(restored),
|
||
)
|
||
except Exception as e:
|
||
Logger.warning(f"CTRL: split cancel restore failed: {e}")
|
||
|
||
def _build_cancelled_split_items(self):
|
||
items = []
|
||
for ucet in (self._split_u_main, self._split_u_sec):
|
||
if not ucet:
|
||
continue
|
||
for pol in ucet.poloz:
|
||
item = pol.model_copy(deep=True)
|
||
item.selected = False
|
||
item.sel_pocet = None
|
||
item.sel_delitel = None
|
||
item.kstornu = None
|
||
items.append(item)
|
||
return self._merge_cancelled_split_items(items)
|
||
|
||
def _merge_cancelled_split_items(self, items):
|
||
normal = []
|
||
menu_groups = {}
|
||
for item in items:
|
||
if item.typ_menu in (0, 10, 11, 12):
|
||
normal.append(item)
|
||
else:
|
||
menu_groups.setdefault(item.group_id, []).append(item)
|
||
|
||
normal = self._merge_cancelled_normal_items(normal)
|
||
menus = self._merge_cancelled_menu_groups(menu_groups)
|
||
return menus + normal
|
||
|
||
def _merge_cancelled_normal_items(self, items):
|
||
merged = {}
|
||
result = []
|
||
for item in items:
|
||
key = getattr(item, "line_id", None)
|
||
if key and key in merged:
|
||
merged[key].pocet += item.pocet
|
||
continue
|
||
if key:
|
||
merged[key] = item
|
||
result.append(item)
|
||
return result
|
||
|
||
def _merge_cancelled_menu_groups(self, menu_groups):
|
||
merged = {}
|
||
orphans = []
|
||
for group_items in menu_groups.values():
|
||
parent = next((item for item in group_items if item.typ_menu == 1), None)
|
||
if not parent:
|
||
orphans.extend(group_items)
|
||
continue
|
||
children = [item for item in group_items if item.typ_menu == 2]
|
||
child_rows = [
|
||
(
|
||
child.id_card,
|
||
child.pol_pocet,
|
||
child.cena,
|
||
child.cenhlad,
|
||
child.sklad,
|
||
child.guest_id,
|
||
child.course_id,
|
||
)
|
||
for child in children
|
||
]
|
||
child_sig = tuple(sorted(child_rows, key=repr))
|
||
sig = (
|
||
parent.id_card,
|
||
parent.cena,
|
||
parent.cenhlad,
|
||
parent.sklad,
|
||
parent.guest_id,
|
||
parent.course_id,
|
||
child_sig,
|
||
)
|
||
existing = merged.get(sig)
|
||
if not existing:
|
||
merged[sig] = {"parent": parent, "children": children}
|
||
continue
|
||
existing["parent"].pocet += parent.pocet
|
||
for child in children:
|
||
target = next((
|
||
item for item in existing["children"]
|
||
if (
|
||
item.id_card,
|
||
item.pol_pocet,
|
||
item.cena,
|
||
item.cenhlad,
|
||
item.sklad,
|
||
item.guest_id,
|
||
item.course_id,
|
||
) == (
|
||
child.id_card,
|
||
child.pol_pocet,
|
||
child.cena,
|
||
child.cenhlad,
|
||
child.sklad,
|
||
child.guest_id,
|
||
child.course_id,
|
||
)
|
||
), None)
|
||
if target:
|
||
target.pocet += child.pocet
|
||
else:
|
||
existing["children"].append(child)
|
||
|
||
result = []
|
||
for menu in merged.values():
|
||
result.append(menu["parent"])
|
||
result.extend(menu["children"])
|
||
result.extend(orphans)
|
||
return result
|
||
|
||
def _remap_items(self, items, course_map, guest_map):
|
||
for p in items:
|
||
if p.course_id in course_map:
|
||
p.course_id = course_map[p.course_id]
|
||
|
||
if p.guest_id in guest_map:
|
||
p.guest_id = guest_map[p.guest_id]
|
||
|
||
def _merge_meta(self, target, source):
|
||
# --- COURSES ---
|
||
t_courses = {c["id"]: c for c in target.courses}
|
||
t_names = {c["name"]: c["id"] for c in target.courses}
|
||
|
||
course_map = {} # old_id -> new_id
|
||
|
||
for c in source.courses:
|
||
if c["name"] in t_names:
|
||
course_map[c["id"]] = t_names[c["name"]]
|
||
else:
|
||
new_id = c["id"]
|
||
t_courses[new_id] = c
|
||
course_map[c["id"]] = new_id
|
||
|
||
target.courses = list(t_courses.values())
|
||
|
||
# --- GUESTS ---
|
||
t_guests = {g["id"]: g for g in target.guests}
|
||
g_names = {g["name"]: g["id"] for g in target.guests}
|
||
|
||
guest_map = {}
|
||
|
||
for g in source.guests:
|
||
if g["name"] in g_names:
|
||
guest_map[g["id"]] = g_names[g["name"]]
|
||
else:
|
||
new_id = g["id"]
|
||
t_guests[new_id] = g
|
||
guest_map[g["id"]] = new_id
|
||
|
||
target.guests = list(t_guests.values())
|
||
|
||
return course_map, guest_map
|
||
|
||
# ------------------------------------------------------------------------
|
||
def load_ucet_z_api(self, stul: str, block: bool = False):
|
||
Logger.info(f"CTRL: load_ucet stul={stul}")
|
||
return api_call.load_ucet_API(
|
||
self.ctx,
|
||
stul=stul,
|
||
block=block,
|
||
)
|
||
# ------------------------------------------------------------------------
|
||
def _release_current_limit(self):
|
||
id_limit = getattr(self, "_limit_id", None)
|
||
if not id_limit:
|
||
return
|
||
try:
|
||
api_call.release_limit_API(self.ctx, id_limit=id_limit)
|
||
except Exception as e:
|
||
Logger.warning(f"Limit release failed: {e}")
|
||
finally:
|
||
self._limit_stul = None
|
||
self._limit_id = None
|
||
self._limit_den_id = None
|
||
|
||
def handle_limit_pos_result(self, u_main_edit, u_sec_edit, operation):
|
||
Logger.info(f"CTRL: limit POS finalize operation={operation}")
|
||
if operation == "noop":
|
||
return
|
||
try:
|
||
if operation == "edit_only":
|
||
ucet = self._combine_limit_ucet_parts(u_main_edit, None)
|
||
if ucet and ucet.poloz:
|
||
api_call.save_limit_ucet_API(self.ctx, ucet)
|
||
return
|
||
if operation == "pay_part":
|
||
_popup_info("Limity", self.tr("limit.full_payment_only", "Limitovy stol sa da zaplatit iba cely naraz."))
|
||
return
|
||
if operation in ("pay_full", "pay_part"):
|
||
if not u_sec_edit or not u_sec_edit.poloz:
|
||
_popup_info("Limity", "Nie je co platit.")
|
||
return
|
||
if not self._mark_pohladavka_ucet(data.ucet_edit_to_ucet(u_sec_edit)):
|
||
return
|
||
self.prepare_operation_for_save(u_main_edit, u_sec_edit)
|
||
combined = self._combine_limit_ucet_parts(u_main_edit, u_sec_edit)
|
||
if combined and combined.poloz:
|
||
api_call.save_limit_ucet_API(self.ctx, combined)
|
||
u_sec_db = data.ucet_edit_to_ucet(u_sec_edit)
|
||
if not self._mark_pohladavka_ucet(u_sec_db):
|
||
return
|
||
self._prepare_limit_ucet(u_sec_db)
|
||
u_sec_db.autor = self.user_login.name
|
||
u_sec_db.datetime = data.stime_str()
|
||
u_sec_db.origin = "Limit"
|
||
u_sec_db.closed_at = data.stime_str()
|
||
u_sec_db.sumdph()
|
||
u_sec_db = api_call.finish_limit_ucet_API(self.ctx, u_sec_db)
|
||
if self._receipt_has_fiscal_payment(u_sec_db):
|
||
try:
|
||
u_sec_db = self._print_fiscal_receipt(u_sec_db, title="Limit")
|
||
except Exception as e:
|
||
Logger.exception("CTRL: fiscal limit receipt print failed")
|
||
failure_storno = self._storno_saved_receipt_after_failed_fiscal(u_sec_db, str(e))
|
||
try:
|
||
api_call.clear_limit_ucet_API(self.ctx, u_sec_db)
|
||
except Exception:
|
||
Logger.exception("CTRL: limit clear after fiscal failure failed")
|
||
storno_note = (
|
||
f"\nInterné storno účtu: {failure_storno.ucislo}"
|
||
if failure_storno and getattr(failure_storno, "ucislo", None)
|
||
else "\nInterné storno účtu sa nepodarilo vytvoriť automaticky."
|
||
)
|
||
_popup_info(
|
||
"Fiskalny doklad",
|
||
"Fiskalny doklad sa nepodarilo vytlacit.\n"
|
||
"Limitovy chod ostal nezaplateny."
|
||
f"{storno_note}\n"
|
||
f"{e}",
|
||
)
|
||
return
|
||
else:
|
||
self._handle_nonfiscal_receipt_after_close(u_sec_db, kind="receipt", title="Limit")
|
||
return
|
||
_popup_info("Limity", f"Nepodporovana operacia limitu: {operation}")
|
||
except Exception as e:
|
||
Logger.exception("CTRL: limit POS finalize failed")
|
||
_popup_info("Limity", f"Limitovy ucet sa nepodarilo spracovat.\n{e}")
|
||
|
||
def _prepare_limit_ucet(self, ucet: data.Ucet | None) -> data.Ucet | None:
|
||
if not ucet:
|
||
return None
|
||
ucet.limit_mode = True
|
||
ucet.limit_id = getattr(ucet, "limit_id", None) or getattr(self, "_limit_id", None)
|
||
ucet.limit_den_id = getattr(ucet, "limit_den_id", None) or getattr(self, "_limit_den_id", None)
|
||
ucet.stul = ucet.stul or getattr(self, "_limit_stul", None)
|
||
ucet.room_name = ucet.room_name or "Limity"
|
||
rov_ids = sorted({self._limit_pol_rov_id(pol) for pol in (getattr(ucet, "poloz", []) or [])})
|
||
ucet.limit_rov_ids = [item for item in rov_ids if item]
|
||
return ucet
|
||
|
||
def _limit_pol_rov_id(self, pol) -> int:
|
||
value = getattr(pol, "limit_rov_id", None)
|
||
try:
|
||
if value:
|
||
return int(value)
|
||
except Exception:
|
||
pass
|
||
course_id = str(getattr(pol, "course_id", "") or "")
|
||
if course_id.startswith("rov:"):
|
||
try:
|
||
return int(course_id.split(":", 1)[1])
|
||
except Exception:
|
||
return 0
|
||
return 0
|
||
|
||
def _combine_limit_ucet_parts(self, u_main_edit, u_sec_edit) -> data.Ucet | None:
|
||
source = u_main_edit or u_sec_edit
|
||
if not source:
|
||
return None
|
||
combined = source.model_copy(deep=True)
|
||
combined.poloz = []
|
||
for part in (u_main_edit, u_sec_edit):
|
||
if part and getattr(part, "poloz", None):
|
||
combined.poloz.extend([pol.model_copy(deep=True) for pol in part.poloz])
|
||
ucet = data.ucet_edit_to_ucet(combined)
|
||
return self._prepare_limit_ucet(ucet)
|
||
|
||
def _on_pos_finish(self, u_main_edit, u_sec_edit, operation, bar_mode: bool = False, limit_mode: bool = False):
|
||
Logger.info(f"CTRL: _on_pos_finish operation={operation} bar_mode={bar_mode} limit_mode={limit_mode}")
|
||
sm = self.app.sm
|
||
# zavřít POS screen
|
||
if sm.has_screen(SCREEN_POS):
|
||
sm.remove_widget(sm.get_screen(SCREEN_POS))
|
||
if limit_mode:
|
||
if operation == "split":
|
||
self.prepare_operation_for_save(u_main_edit, u_sec_edit)
|
||
self._split_source_is_limit = True
|
||
self._split_target_is_limit = False
|
||
self._handle_split_result(u_main_edit, u_sec_edit)
|
||
return
|
||
try:
|
||
self.handle_limit_pos_result(u_main_edit, u_sec_edit, operation)
|
||
finally:
|
||
self._release_current_limit()
|
||
self.app.sm.current = SCREEN_ACCOUNT
|
||
return
|
||
bar_stul = self._bar_stul if bar_mode else None
|
||
next_action = self.handle_pos_result(u_main_edit, u_sec_edit, operation)
|
||
if isinstance(next_action, tuple) and next_action[0] == "reopen_table":
|
||
reopen_stul = next_action[1]
|
||
self.app.sm.current = SCREEN_ACCOUNT
|
||
if reopen_stul:
|
||
Clock.schedule_once(lambda *_: self.open_table(reopen_stul, bar_mode=bar_mode), 0)
|
||
return
|
||
# navigace
|
||
if operation == "split":
|
||
Logger.info("CTRL: split → navigace řízena zvlášť")
|
||
return
|
||
if bar_mode and operation in ("pay_full", "pay_part"):
|
||
self.app.sm.current = SCREEN_ACCOUNT
|
||
Clock.schedule_once(lambda *_: self.open_table(bar_stul or self._bar_table_id(), bar_mode=True), 0)
|
||
return
|
||
# všechny ostatní případy (včetně noop) → návrat na account
|
||
self.app.sm.current = SCREEN_ACCOUNT
|
||
# ------------------------------------------------------------------------
|
||
def cleanup_open_ucet(self, u_edit: data.UcetEdit | None):
|
||
#"""
|
||
#Uklid po editaci otevřeného účtu:
|
||
#- prázdný → DELETE
|
||
#- neprázdný → UNBLOCK
|
||
#"""
|
||
if not u_edit:
|
||
return
|
||
stul = u_edit.stul
|
||
try:
|
||
if not u_edit.poloz:
|
||
# prázdný otevřený účet → smazat
|
||
api_call.delete_ucet_API(
|
||
self.ctx,
|
||
stul=stul
|
||
)
|
||
else:
|
||
# účet má položky → jen odblokovat
|
||
api_call.unblock_ucet_API(
|
||
self.ctx,
|
||
stul=stul
|
||
)
|
||
except Exception as e:
|
||
Logger.exception(e)
|
||
# ------------------------------------------------------------------------
|
||
def _setup_for_pos_mode(self, bar_mode: bool = False):
|
||
if not self.setup:
|
||
return None
|
||
if not bar_mode:
|
||
return self.setup
|
||
setup = self.setup.model_copy(deep=True)
|
||
setup.is_chod = False
|
||
setup.is_host = False
|
||
return setup
|
||
|
||
def open_posdialog(self, ucet, bar_mode: bool = False, limit_mode: bool = False):
|
||
Logger.info(f"CTRL: open_posdialog bar_mode={bar_mode} limit_mode={limit_mode}")
|
||
sm = self.app.sm
|
||
if sm.has_screen(SCREEN_POS):
|
||
sm.remove_widget(sm.get_screen(SCREEN_POS))
|
||
stul = ucet.stul
|
||
self._pos_mode = "limit" if limit_mode else ("bar" if bar_mode else "normal")
|
||
self._bar_stul = stul if bar_mode else None
|
||
self._limit_stul = stul if limit_mode else None
|
||
self._limit_id = getattr(ucet, "limit_id", None) if limit_mode else None
|
||
self._limit_den_id = getattr(ucet, "limit_den_id", None) if limit_mode else None
|
||
if not limit_mode:
|
||
# atomické otevření + blokace na serveru
|
||
try:
|
||
api_call.open_block_ucet_API(self.ctx, stul)
|
||
except Exception as e:
|
||
msg = str(e)
|
||
if "blokovan" in msg: #code 409
|
||
_popup_info(
|
||
"Upozornění",
|
||
f"Stůl {stul} je právě otevřen\nna jiném zařízení."
|
||
)
|
||
return
|
||
if "neexistuje" in msg: # 404, nemelo by se stat, vytvoril jsi ho
|
||
Logger.error("Unexpected 404 from open_block_ucet")
|
||
_popup_info(
|
||
"Chyba",
|
||
f"Stůl {stul} neexistuje"
|
||
)
|
||
return
|
||
# ostatní chyby
|
||
_popup_info( "Chyba", f"Server error {str(e)}")
|
||
Logger.exception(e)
|
||
return
|
||
# že účet je muj
|
||
self._editing_stul = stul
|
||
try:
|
||
ucet = api_call.load_ucet_API(
|
||
self.ctx,
|
||
stul=stul,
|
||
block=False
|
||
)
|
||
except Exception as e:
|
||
# uklid – účet mohl být dummy
|
||
self.cleanup_open_ucet(
|
||
data.UcetEdit(stul=stul, poloz=[])
|
||
)
|
||
_popup_info("Chyba", "Nelze načíst účet ze serveru")
|
||
Logger.exception(e)
|
||
# případně unblock, pokud chceš být extra čistý
|
||
return
|
||
else:
|
||
self._editing_stul = None
|
||
self._opened_dummy = False
|
||
# převod DB → editace
|
||
ucet_edit = data.ucet_to_edit(ucet)
|
||
setup_for_pos = self._setup_for_pos_mode(bar_mode=bar_mode)
|
||
# otevření POSDialogu
|
||
dlg = posdialog.POSDialog(
|
||
controller=self,
|
||
default_price_level=setup_for_pos.default_price_level if setup_for_pos else "1",
|
||
default_printer=self.default_printer,
|
||
cenik=self.cenik_ui or self.cenik,
|
||
setup=setup_for_pos,
|
||
fstmenu=self.fstmenu,
|
||
levels=self._levels,
|
||
printers=self._printers,
|
||
bankterms=self._bankterms,
|
||
payments=self._payments,
|
||
alllevels=self._price_levels,
|
||
static_maps=self.pos_static_maps,
|
||
kasutxt=self.kasutxt,
|
||
limit_mode=limit_mode,
|
||
name=SCREEN_POS,
|
||
)
|
||
dlg._refresh_price_button()
|
||
dlg._refresh_printer_button()
|
||
dlg.set_ucet(ucet_edit)
|
||
dlg.bind(
|
||
on_finish=lambda _, u_main, u_sec, op:
|
||
self._on_pos_finish(u_main, u_sec, op, bar_mode=bar_mode, limit_mode=limit_mode)
|
||
)
|
||
sm.add_widget(dlg)
|
||
sm.current = SCREEN_POS
|
||
# ------------------------------------------------------------------------
|
||
def handle_pos_result(self, u_main_edit, u_sec_edit, operation):
|
||
Logger.info(f"CTRL: POS finalize operation={operation}")
|
||
# rozhodnutí pro finally
|
||
if operation == "split":
|
||
self._opened_dummy = False # nutné
|
||
skip_final_unblock = (operation == "split" or operation == "storno2")
|
||
try:#finally se udela vzdy (i po returnu)
|
||
# NOOP (ESC, zavření dialogu)
|
||
if operation == "noop":
|
||
Logger.info("CTRL: noop")
|
||
# standardní cesta
|
||
if u_main_edit is not None:
|
||
if not u_main_edit.poloz:
|
||
self.cleanup_open_ucet(u_main_edit)
|
||
return
|
||
# fallback: POSDialog neposlal u_main_edit (ESC)
|
||
stul = getattr(self, "_editing_stul", None)
|
||
if not stul:
|
||
return
|
||
try:
|
||
ucet_db = api_call.load_ucet_API(self.ctx, stul=stul, block=False)
|
||
except Exception as e:
|
||
# když nejde načíst, radši nemaž (jen se to odblokuje ve finally)
|
||
Logger.exception(e)
|
||
return
|
||
# jestli je prázdný → smazat
|
||
ucet_edit = data.ucet_to_edit(ucet_db)
|
||
if not ucet_edit.poloz:
|
||
self.cleanup_open_ucet(ucet_edit)
|
||
return
|
||
if operation == "split":
|
||
self._split_source_is_limit = False
|
||
self._split_target_is_limit = False
|
||
self.finalize_operation(u_main_edit, None)
|
||
self.finalize_operation(u_sec_edit, None)
|
||
self._opened_dummy = False
|
||
self._handle_split_result(u_main_edit, u_sec_edit)
|
||
return
|
||
if operation == "storno" or operation == "storno2" :
|
||
Logger.info("CTRL: storno otevřeného účtu")
|
||
if u_main_edit and u_main_edit.poloz:
|
||
self.finalize_operation(u_main_edit, None)
|
||
api_call.save_ucet_API(
|
||
self.ctx,
|
||
data.ucet_edit_to_ucet(u_main_edit)
|
||
)
|
||
else: #vzdy existuje, alespon dummy
|
||
api_call.delete_ucet_API(self.ctx, stul=u_main_edit.stul)
|
||
if u_sec_edit and u_sec_edit.poloz:
|
||
self._enqueue_kitchen_print_jobs(u_sec_edit, kind="storno")
|
||
return
|
||
if operation == "edit_only":
|
||
Logger.info("CTRL: edit_only")
|
||
if u_main_edit and u_main_edit.poloz:
|
||
self.finalize_operation(u_main_edit, None)
|
||
api_call.save_ucet_API(
|
||
self.ctx,
|
||
data.ucet_edit_to_ucet(u_main_edit)
|
||
)
|
||
else:
|
||
# --- prázdný účet → uklid ---
|
||
Logger.info("CTRL: edit_only + empty account → cleanup")
|
||
if u_main_edit:
|
||
self.cleanup_open_ucet(u_main_edit)
|
||
return
|
||
if operation in ("pay_full", "pay_part"):
|
||
Logger.info(f"CTRL: {operation}")
|
||
if not u_sec_edit:
|
||
Logger.error("CTRL: missing u_sec_edit for payment")
|
||
return
|
||
if not self._mark_pohladavka_ucet(data.ucet_edit_to_ucet(u_sec_edit)):
|
||
self._restore_table_after_failed_payment(u_main_edit, u_sec_edit)
|
||
reopen_stul = (
|
||
getattr(u_main_edit, "stul", None)
|
||
or getattr(u_sec_edit, "stul", None)
|
||
or getattr(self, "_editing_stul", None)
|
||
)
|
||
return ("reopen_table", reopen_stul)
|
||
#tiskne bony
|
||
self.prepare_operation_for_save(u_main_edit, u_sec_edit)
|
||
# --- uzavřený účet (VŽDY u_sec, dopln vsechny udaje) ---
|
||
u_sec_db = data.ucet_edit_to_ucet(u_sec_edit)
|
||
if not self._mark_pohladavka_ucet(u_sec_db):
|
||
self._restore_table_after_failed_payment(u_main_edit, u_sec_edit)
|
||
reopen_stul = (
|
||
getattr(u_main_edit, "stul", None)
|
||
or getattr(u_sec_edit, "stul", None)
|
||
or getattr(self, "_editing_stul", None)
|
||
)
|
||
return ("reopen_table", reopen_stul)
|
||
u_sec_db.sumdph()
|
||
u_sec_db.autor = self.user_login.name #self.ctx.user
|
||
u_sec_db.datetime = data.stime_str()
|
||
u_sec_db.origin ="Ucet"
|
||
u_sec_db.closed_at = data.stime_str()
|
||
try:
|
||
u_sec_db = api_call.save_ucet_API(self.ctx, u_sec_db)
|
||
except Exception as e:
|
||
Logger.exception(e)
|
||
self._restore_table_after_failed_payment(u_main_edit, u_sec_edit)
|
||
_popup_info(
|
||
"Platba",
|
||
"Ucet sa nepodarilo uzavriet alebo odoslat do recepcie.\n"
|
||
f"{e}",
|
||
)
|
||
reopen_stul = (
|
||
getattr(u_main_edit, "stul", None)
|
||
or getattr(u_sec_edit, "stul", None)
|
||
or getattr(self, "_editing_stul", None)
|
||
)
|
||
return ("reopen_table", reopen_stul)
|
||
if self._receipt_has_fiscal_payment(u_sec_db):
|
||
try:
|
||
u_sec_db = self._print_fiscal_receipt(u_sec_db, title="Ucet")
|
||
except Exception as e:
|
||
Logger.exception("CTRL: fiscal receipt print failed")
|
||
failure_storno = self._storno_saved_receipt_after_failed_fiscal(u_sec_db, str(e))
|
||
self._restore_table_after_failed_payment(u_main_edit, u_sec_edit)
|
||
storno_note = (
|
||
f"\nInterné storno účtu: {failure_storno.ucislo}"
|
||
if failure_storno and getattr(failure_storno, "ucislo", None)
|
||
else "\nInterné storno účtu sa nepodarilo vytvoriť automaticky."
|
||
)
|
||
_popup_info(
|
||
"Fiskalny doklad",
|
||
"Fiskalny doklad sa nepodarilo vytlacit.\n"
|
||
"Polozky boli vratene na stol."
|
||
f"{storno_note}\n"
|
||
f"{e}",
|
||
)
|
||
reopen_stul = (
|
||
getattr(u_main_edit, "stul", None)
|
||
or getattr(u_sec_edit, "stul", None)
|
||
or getattr(self, "_editing_stul", None)
|
||
)
|
||
return ("reopen_table", reopen_stul)
|
||
else:
|
||
self._handle_nonfiscal_receipt_after_close(u_sec_db, kind="receipt", title="Ucet")
|
||
# --- zbytek otevřeného účtu (jen při částečné platbě) ---
|
||
if u_main_edit and u_main_edit.poloz:
|
||
u_main_db = data.ucet_edit_to_ucet(u_main_edit)
|
||
u_main_db.closed_at = None
|
||
u_main_db.ucislo = None
|
||
api_call.save_ucet_API(self.ctx, u_main_db)
|
||
return
|
||
# FALLBACK
|
||
Logger.warning(f"CTRL: unknown operation {operation}")
|
||
finally:
|
||
if skip_final_unblock:
|
||
return
|
||
# UNBLOCK – vždy až po dokončení práce s účtem
|
||
if getattr(self, "_editing_stul", None):
|
||
if not self._opened_dummy:
|
||
try:
|
||
ucet = api_call.load_ucet_API(
|
||
self.ctx,
|
||
stul=self._editing_stul,
|
||
block=False
|
||
)
|
||
if not ucet.poloz:
|
||
Logger.info("CTRL: dummy account without changes → delete")
|
||
api_call.delete_ucet_API(self.ctx, stul=self._editing_stul)
|
||
except Exception as e:
|
||
Logger.warning(f"Dummy cleanup failed: {e}")
|
||
# vždy odblokovat
|
||
try:
|
||
api_call.unblock_ucet_API(self.ctx, stul=self._editing_stul)
|
||
except RuntimeError as e:
|
||
if "neexistuje" in str(e):
|
||
Logger.info("CTRL: account already deleted, no unblock needed")
|
||
else:
|
||
raise
|
||
finally:
|
||
self._editing_stul = None
|
||
self._opened_dummy = False
|
||
# ------------------------------------------------------------------------
|
||
def _restore_table_after_failed_payment(self, u_main_edit, u_sec_edit):
|
||
source = u_main_edit or u_sec_edit
|
||
if not source:
|
||
return
|
||
restored = source.model_copy(deep=True)
|
||
restored.poloz = []
|
||
if u_main_edit and u_main_edit.poloz:
|
||
restored.poloz.extend([pol.model_copy(deep=True) for pol in u_main_edit.poloz])
|
||
if u_sec_edit and u_sec_edit.poloz:
|
||
restored.poloz.extend([pol.model_copy(deep=True) for pol in u_sec_edit.poloz])
|
||
if not restored.poloz:
|
||
return
|
||
restored.closed_at = None
|
||
restored.ucislo = None
|
||
restored.platby = []
|
||
restored.hotel_charge = None
|
||
restored.hotel_charge_preparation = None
|
||
restored.hotel_charge_send_result = None
|
||
try:
|
||
restore_db = data.ucet_edit_to_ucet(restored)
|
||
restore_db.closed_at = None
|
||
restore_db.ucislo = None
|
||
restore_db.platby = []
|
||
restore_db.hotel_charge = None
|
||
restore_db.hotel_charge_preparation = None
|
||
restore_db.hotel_charge_send_result = None
|
||
api_call.save_ucet_API(self.ctx, restore_db)
|
||
Logger.info("CTRL: restored open table after failed payment")
|
||
except Exception as restore_error:
|
||
Logger.exception(restore_error)
|
||
|
||
# ------------------------------------------------------------------------
|
||
def prepare_operation_for_save(self, u_main_edit, u_sec_edit):
|
||
self._print_kitchen_if_needed(u_main_edit)
|
||
self._print_kitchen_if_needed(u_sec_edit)
|
||
if self.setup and getattr(self.setup, "sum_items", False):
|
||
self._sum_items(u_main_edit)
|
||
self._sum_items(u_sec_edit)
|
||
|
||
# ------------------------------------------------------------------------
|
||
def finalize_operation(self, u_main_edit, u_sec_edit):
|
||
# pokud setup -> sloučit položky
|
||
# u neodeslanych polozek
|
||
# označit sent
|
||
# ---------- KUCHYŇ ----------
|
||
self.prepare_operation_for_save(u_main_edit, u_sec_edit)
|
||
# ---------- SAVE MAIN ----------
|
||
if u_main_edit:
|
||
if u_main_edit.poloz:
|
||
api_call.save_ucet_API(
|
||
self.ctx,
|
||
data.ucet_edit_to_ucet(u_main_edit)
|
||
)
|
||
else:
|
||
self.cleanup_open_ucet(u_main_edit)
|
||
# ---------- SAVE SEC ----------
|
||
if u_sec_edit and u_sec_edit.poloz:
|
||
api_call.save_ucet_API(
|
||
self.ctx,
|
||
data.ucet_edit_to_ucet(u_sec_edit)
|
||
)
|
||
# ------------------------------------------------------------------------
|
||
def _print_kitchen_if_needed(self, u_edit: data.UcetEdit | None):
|
||
#tiskne bony do kuchyne vzdy, kdyz po operaci PosDialog zbyde neodeslava
|
||
#polozka na ucte
|
||
if not u_edit or not u_edit.poloz:
|
||
Logger.info("CTRL: kitchen print skipped - no account items")
|
||
return
|
||
u_print = self._collect_kitchen_items(u_edit)
|
||
if not u_print:
|
||
sent_count = sum(1 for p in u_edit.poloz if getattr(p, "sent", False))
|
||
Logger.info(
|
||
f"CTRL: kitchen print skipped - no unsent items total={len(u_edit.poloz)} sent={sent_count}"
|
||
)
|
||
return
|
||
self._enqueue_kitchen_print_jobs(u_print, kind="bon")
|
||
# označit položky jako odeslané
|
||
for p in u_edit.poloz:
|
||
if not p.sent:
|
||
p.sent = True
|
||
|
||
# =====================================================
|
||
class MainScreen(Screen):
|
||
def __init__(self, controller: ApiController, **kwargs):
|
||
super().__init__(**kwargs)
|
||
self.controller = controller
|
||
root = BoxLayout(
|
||
orientation="vertical",
|
||
padding=dp(20),
|
||
spacing=dp(10),
|
||
)
|
||
self.lbl = Label(font_size=20)
|
||
root.add_widget(self.lbl)
|
||
btn_exit = Button(
|
||
text="KONEC",
|
||
size_hint_y=None,
|
||
height=dp(50),
|
||
)
|
||
btn_exit.bind(on_press=self._exit_app)
|
||
root.add_widget(btn_exit)
|
||
self.add_widget(root)
|
||
def _exit_app(self, *_):
|
||
Logger.info("EXIT pressed → stopping heartbeat + logout")
|
||
try:
|
||
# zastav thread jako první
|
||
api_call.stop_heartbeat(self.controller.ctx)
|
||
# potom logout
|
||
api_call.logout_API(self.controller.ctx)
|
||
except Exception as e:
|
||
Logger.warning(f"Logout failed: {e}")
|
||
finally:
|
||
App.get_running_app().stop()
|
||
def refresh(self):
|
||
self.lbl.text = (f"POKLADNA {self.controller.ctx.id_kas}\n"
|
||
f"DEFAULT CEN HLAD: {self.controller.setup.default_price_level}\n"
|
||
f"POLOŽEK V CENÍKU: {len(self.controller.cenik.cenpol)}\n")
|
||
|
||
# =====================================================
|
||
|
||
# =====================================================
|
||
# APP
|
||
# =====================================================
|
||
class PokladnaApp(App):
|
||
def __init__(self, **kwargs):
|
||
super().__init__(**kwargs)
|
||
self._login_shown = False
|
||
def build(self):
|
||
self.cfg = ConfigManager()
|
||
self.controller = ApiController(self)
|
||
self.sm = ScreenManager()
|
||
# ---------- ACCOUNT ----------
|
||
self.account_screen = accountselect.AccountSelectScreen(
|
||
name=SCREEN_ACCOUNT,
|
||
controller=self.controller,
|
||
get_ucty=lambda: self.controller.load_stoly(closed=False),
|
||
on_select=self.controller.open_table,
|
||
on_cancel=lambda: None,
|
||
on_logout=self.controller.logout_user,
|
||
on_bar=self.controller.open_bar,
|
||
on_info=self.controller._on_info,
|
||
mode="normal",
|
||
only_opened=True,
|
||
allow_manual=True,
|
||
)
|
||
# ---------- LOGIN USER ----------
|
||
self.login_user_screen = LoginUserScreen(
|
||
self.controller,
|
||
on_success=self.show_account_select,
|
||
name=SCREEN_LOGIN_USER,
|
||
)
|
||
self.sm.add_widget(self.login_user_screen)
|
||
self.sm.add_widget(self.account_screen)
|
||
#vracím ScreenManager
|
||
self.sm.current = SCREEN_LOGIN_USER
|
||
return self.sm
|
||
def on_stop(self):
|
||
Logger.info("Kivy on_stop called")
|
||
self._on_exit()
|
||
def _start_app(self, *_):
|
||
self.controller.start_app()
|
||
self._show_login()
|
||
def _exit_app(self, *_):
|
||
self._on_exit()
|
||
App.get_running_app().stop()
|
||
def _on_exit(self):
|
||
if getattr(self, "_cleanup_done", False):
|
||
return
|
||
self._cleanup_done = True
|
||
Logger.info("Application cleanup started")
|
||
try:
|
||
api_call.stop_heartbeat(self.controller.ctx)
|
||
api_call.logout_API(self.controller.ctx)
|
||
except Exception as e:
|
||
Logger.warning(f"Cleanup failed: {e}")
|
||
def _show_login(self):
|
||
if self._login_shown:
|
||
Logger.info("LOGIN already shown → skip")
|
||
return
|
||
self._login_shown = True
|
||
Logger.info("APP: show login user")
|
||
self.sm.current = SCREEN_LOGIN_USER
|
||
def show_login(self):
|
||
Logger.info("APP: show_login")
|
||
self._login_shown = False
|
||
if hasattr(self, "login_user_screen"):
|
||
pass
|
||
self.sm.current = SCREEN_LOGIN_USER
|
||
def show_account_select(self):
|
||
Logger.info("APP: show_account_select")
|
||
self.sm.current = SCREEN_ACCOUNT
|
||
if self.controller:
|
||
if self.controller.client_settings:
|
||
if self.controller.client_settings["room_name"]:
|
||
self.account_screen.view_mode="room"
|
||
self.account_screen.selected_room=self.controller.client_settings["room_name"]
|
||
self.account_screen.top_holder.clear_widgets()
|
||
if self.account_screen.map_view is None:
|
||
self.account_screen._build_map_view()
|
||
if self.account_screen.map_view is not None:
|
||
self.account_screen.map_view.current_room = self.controller.client_settings["room_name"]
|
||
self.account_screen.top_holder.add_widget(self.account_screen.map_view)
|
||
else:
|
||
self.account_screen.view_mode="open_table"
|
||
if self.account_screen.opened_view is None:
|
||
self.account_screen._build_opened_view()
|
||
self.account_screen.top_holder.add_widget(self.account_screen.opened_view)
|
||
|
||
def on_start(self):
|
||
Logger.info("APP on_start → startuji backend init")
|
||
import threading
|
||
threading.Thread(
|
||
target=self.controller._start_app_thread,
|
||
daemon=True
|
||
).start()
|
||
def _logout_and_back(self):
|
||
self.controller.logout_user()
|
||
self._show_login()
|
||
|
||
if __name__ == "__main__":
|
||
Logger.info("=== RUNNING PokladnaApp ===")
|
||
PokladnaApp().run()
|