Files
KPK/local_print_agent.py
2026-06-23 15:20:56 +02:00

263 lines
9.3 KiB
Python

import argparse
import getpass
import json
import logging
import os
import socket
import sys
import time
from pathlib import Path
os.environ.setdefault("KIVY_NO_ARGS", "1")
import requests
import data
from server_sqlite import process_print_job
logger = logging.getLogger("local_print_agent")
class AgentApiClient:
def __init__(
self,
*,
base_url: str,
client_id: str,
id_kas: str,
username: str,
password: str,
user: str = "",
):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.id_kas = id_kas
self.username = username
self.password = password
self.user = user
self.token = ""
self.refresh_token = ""
self.session = requests.Session()
def _headers(self) -> dict:
headers = {"X-Client-ID": self.client_id}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
return headers
def _refresh_access_token(self) -> bool:
if not self.refresh_token:
return False
response = self.session.post(
f"{self.base_url}/refresh/",
headers={"X-Client-ID": self.client_id},
json={"refresh_token": f"Bearer {self.refresh_token}"},
timeout=10,
)
if response.status_code >= 400:
return False
payload = response.json()
self.token = payload.get("access_token") or payload.get("token") or self.token
return bool(self.token)
def request(self, method: str, endpoint: str, **kwargs):
kwargs.setdefault("timeout", 30)
kwargs["headers"] = {**kwargs.pop("headers", {}), **self._headers()}
response = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs)
if response.status_code == 401 and self._refresh_access_token():
kwargs["headers"] = {**kwargs.pop("headers", {}), **self._headers()}
response = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs)
if response.status_code >= 400:
try:
detail = response.json()
except ValueError:
detail = response.text
raise RuntimeError(f"{method} {endpoint} failed: {detail}")
try:
return response.json()
except ValueError:
return response.text
def login(self) -> tuple[str, str]:
response = self.request(
"POST",
"/login/",
json={
"username": self.username,
"password": self.password,
"id_kas": self.id_kas,
},
)
self.token = response["access_token"]
self.refresh_token = response["refresh_token"]
return response.get("version_API", ""), response.get("database_name", "")
def claim_jobs(self, *, agent_id: str, printers: list[str], limit: int) -> list[data.PrintJob]:
payload = data.PrintJobClaimRequest(
id_kas=self.id_kas,
agent_id=agent_id,
printers=printers,
limit=limit,
).model_dump(mode="json")
response = self.request("POST", "/print/jobs/claim/", json=payload)
return [data.PrintJob.model_validate(item) for item in response]
def update_job_status(
self,
job_id: int,
status: str,
*,
result: dict | None = None,
error: str = "",
) -> data.PrintJob:
payload = data.PrintJobStatusUpdate(
status=status,
result=result or {},
error=error or "",
).model_dump(mode="json")
response = self.request("POST", f"/print/jobs/{job_id}/status", json=payload)
return data.PrintJob.model_validate(response)
def _csv(value: str | None) -> list[str]:
return [item.strip() for item in str(value or "").split(",") if item.strip()]
def _load_config(path: str | None) -> dict:
if not path:
return {}
config_path = Path(path)
if not config_path.exists():
raise FileNotFoundError(f"Konfiguracny subor neexistuje: {config_path}")
return json.loads(config_path.read_text(encoding="utf-8"))
def _arg_value(args, config: dict, name: str, default=None):
value = getattr(args, name, None)
if value is not None and value != "":
return value
return config.get(name, default)
def build_client(args, config: dict) -> AgentApiClient:
password = _arg_value(args, config, "password", "")
if not password:
password = getpass.getpass("Heslo zakazky: ")
return AgentApiClient(
user=str(_arg_value(args, config, "user", "") or ""),
base_url=str(_arg_value(args, config, "base_url", "") or "").rstrip("/"),
client_id=str(_arg_value(args, config, "client_id", "") or ""),
id_kas=str(_arg_value(args, config, "id_kas", "") or ""),
username=str(_arg_value(args, config, "username", "") or ""),
password=str(password),
)
def process_once(
client: AgentApiClient,
*,
agent_id: str,
printers: list[str],
limit: int,
timeout: float,
) -> list[data.PrintJob]:
claimed = client.claim_jobs(
agent_id=agent_id,
printers=printers,
limit=limit,
)
processed: list[data.PrintJob] = []
for job in claimed:
try:
client.update_job_status(job.id, "printing")
result = process_print_job(job, timeout=timeout)
processed.append(
client.update_job_status(
job.id,
"printed",
result=result,
)
)
logger.info("Vytlaceny job %s printer=%s", job.id, job.printer_no)
except Exception as exc:
next_status = "failed_final" if job.attempts >= job.max_attempts else "retry_pending"
logger.exception("Tlac jobu %s zlyhala, status=%s", job.id, next_status)
processed.append(
client.update_job_status(
job.id,
next_status,
error=str(exc),
)
)
return processed
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Lokalny print agent pre Pokladna print_jobs frontu.",
)
parser.add_argument("--config", help="Volitelny JSON konfiguracny subor.")
parser.add_argument("--base-url", dest="base_url", help="URL aplikacneho servera, napr. http://server:8000")
parser.add_argument("--username", help="Login zakazky.")
parser.add_argument("--password", help="Heslo zakazky. Ak chyba, agent sa opyta v konzole.")
parser.add_argument("--user", default="", help="Meno pouzivatela pre kontext API.")
parser.add_argument("--id-kas", dest="id_kas", help="Cislo pokladne pouzite pri prihlaseni agenta.")
parser.add_argument("--client-id", dest="client_id", default="", help="Client/terminal id agenta.")
parser.add_argument("--agent-id", dest="agent_id", default="", help="Identifikator agenta vo fronte.")
parser.add_argument("--printers", default="", help="Ciarkou oddeleny zoznam prn_no, ktore agent obsluhuje. Prazdne znamena vsetky tlaciarne zakazky.")
parser.add_argument("--interval", type=float, default=2.0, help="Pauza medzi cyklami v sekundach.")
parser.add_argument("--limit", type=int, default=10, help="Max pocet jobov na jeden cyklus.")
parser.add_argument("--timeout", type=float, default=10.0, help="Timeout jednej tlace v sekundach.")
parser.add_argument("--once", action="store_true", help="Spracuje jeden cyklus a skonci.")
parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"])
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
config = _load_config(args.config)
client = build_client(args, config)
if not client.base_url or not client.username or not client.id_kas:
raise SystemExit("Chyba base-url, username alebo id-kas.")
if not client.client_id:
client.client_id = f"print-agent-{socket.gethostname()}"
agent_id = str(_arg_value(args, config, "agent_id", "") or client.client_id)
printers = _csv(_arg_value(args, config, "printers", ""))
version, database = client.login()
logger.info(
"Print agent prihlaseny: server=%s db=%s login_id_kas=%s agent=%s printers=%s",
version,
database,
client.id_kas,
agent_id,
printers or "*",
)
while True:
processed = process_once(
client,
agent_id=agent_id,
printers=printers,
limit=max(1, min(int(args.limit or 10), 100)),
timeout=max(1.0, float(args.timeout or 10.0)),
)
if processed:
logger.info("Spracovanych jobov: %s", len(processed))
if args.once:
return 0
time.sleep(max(0.2, float(args.interval or 2.0)))
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
logger.info("Print agent ukonceny.")
raise SystemExit(0)