From 8d120f4d5601855f166c0792b485032cb5454c67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20F=C3=B6rtsch?= Date: Tue, 3 Mar 2026 16:58:23 +0100 Subject: [PATCH] fix backend 404 when telegram is unreachable, serve cached packs Co-Authored-By: Claude Opus 4.6 --- backend/app/models.py | 2 + backend/app/services/sticker_service.py | 104 +++++++++++++----------- 2 files changed, 59 insertions(+), 47 deletions(-) diff --git a/backend/app/models.py b/backend/app/models.py index 666c33c..9f06072 100644 --- a/backend/app/models.py +++ b/backend/app/models.py @@ -2,6 +2,8 @@ from pydantic import BaseModel class StickerResponse(BaseModel): + model_config = {"frozen": False} + id: str emoji: str emoji_name: str diff --git a/backend/app/services/sticker_service.py b/backend/app/services/sticker_service.py index f61b984..7f57aeb 100644 --- a/backend/app/services/sticker_service.py +++ b/backend/app/services/sticker_service.py @@ -24,35 +24,10 @@ def _pack_is_cached(pack_name: str) -> bool: return has_png or has_gif -def get_sticker_set(pack_name: str) -> StickerSetResponse | None: - """Fetch a sticker pack, download and convert if not cached.""" - manager = _get_manager() - - # Get pack metadata (uses requests-cache internally) - pack = manager.getPack(pack_name) - if pack is None: - return None - - actual_name = pack["name"] - - if not _pack_is_cached(actual_name): - # downloadPack calls getPack internally again, but it's cached - manager.downloadPack(actual_name) - manager.convertPack(actual_name, formats={"gif", "png"}, backend=Backend.RLOTTIE_PYTHON) - - # Build response from files on disk - png_dir = settings.downloads_dir / actual_name / "png" - gif_dir = settings.downloads_dir / actual_name / "gif" - - # Build a lookup of sticker metadata from the pack files - sticker_lookup: dict[str, dict] = {} - for sticker in pack["files"]: - sticker_id = sticker.name.split("_")[-1].split(".")[0] - sticker_lookup[sticker_id] = { - "emoji": sticker.emoji, - "emoji_name": sticker.emojiName(), - "is_animated": sticker.fileType == "tgs", - } +def _build_response_from_cache(pack_name: str, title: str) -> StickerSetResponse: + """Build a StickerSetResponse from cached files on disk.""" + png_dir = settings.downloads_dir / pack_name / "png" + gif_dir = settings.downloads_dir / pack_name / "gif" stickers: list[StickerResponse] = [] if png_dir.exists(): @@ -60,29 +35,22 @@ def get_sticker_set(pack_name: str) -> StickerSetResponse | None: sticker_id = png_file.stem.split("+")[0] emoji_name = png_file.stem.split("+")[1] if "+" in png_file.stem else "" - meta = sticker_lookup.get(sticker_id, {}) - emoji = meta.get("emoji", "") - is_animated = meta.get("is_animated", False) - if not emoji_name: - emoji_name = meta.get("emoji_name", "") - gif_url = None gif_file = gif_dir / f"{png_file.stem}.gif" if gif_file.exists(): - gif_url = f"/api/stickersets/{actual_name}/stickers/{sticker_id}.gif" + gif_url = f"/api/stickersets/{pack_name}/stickers/{sticker_id}.gif" stickers.append( StickerResponse( id=sticker_id, - emoji=emoji, + emoji="", emoji_name=emoji_name, - is_animated=is_animated, - png_url=f"/api/stickersets/{actual_name}/stickers/{sticker_id}.png", + is_animated=gif_url is not None, + png_url=f"/api/stickersets/{pack_name}/stickers/{sticker_id}.png", gif_url=gif_url, ) ) - # Also check for GIF-only stickers (animated TGS that don't produce PNG) if gif_dir.exists(): existing_ids = {s.id for s in stickers} for gif_file in sorted(gif_dir.iterdir()): @@ -90,27 +58,69 @@ def get_sticker_set(pack_name: str) -> StickerSetResponse | None: if sticker_id in existing_ids: continue emoji_name = gif_file.stem.split("+")[1] if "+" in gif_file.stem else "" - meta = sticker_lookup.get(sticker_id, {}) stickers.append( StickerResponse( id=sticker_id, - emoji=meta.get("emoji", ""), - emoji_name=emoji_name or meta.get("emoji_name", ""), + emoji="", + emoji_name=emoji_name, is_animated=True, - png_url=f"/api/stickersets/{actual_name}/stickers/{sticker_id}.png", - gif_url=f"/api/stickersets/{actual_name}/stickers/{sticker_id}.gif", + png_url=f"/api/stickersets/{pack_name}/stickers/{sticker_id}.png", + gif_url=f"/api/stickersets/{pack_name}/stickers/{sticker_id}.gif", ) ) return StickerSetResponse( - name=actual_name, - title=pack["title"], + name=pack_name, + title=title, sticker_count=len(stickers), stickers=stickers, ) +def get_sticker_set(pack_name: str) -> StickerSetResponse | None: + """Fetch a sticker pack, download and convert if not cached.""" + manager = _get_manager() + + pack = manager.getPack(pack_name) + + if pack is not None: + actual_name = pack["name"] + + if not _pack_is_cached(actual_name): + manager.downloadPack(actual_name) + manager.convertPack(actual_name, formats={"gif", "png"}, backend=Backend.RLOTTIE_PYTHON) + + # Enrich response with Telegram metadata when available + sticker_lookup: dict[str, dict] = {} + for sticker in pack["files"]: + sticker_id = sticker.name.split("_")[-1].split(".")[0] + sticker_lookup[sticker_id] = { + "emoji": sticker.emoji, + "emoji_name": sticker.emojiName(), + "is_animated": sticker.fileType == "tgs", + } + + response = _build_response_from_cache(actual_name, pack["title"]) + for s in response.stickers: + meta = sticker_lookup.get(s.id, {}) + s.emoji = meta.get("emoji", s.emoji) + s.emoji_name = meta.get("emoji_name", s.emoji_name) or s.emoji_name + s.is_animated = meta.get("is_animated", s.is_animated) + return response + + # Telegram unreachable — serve from cache if available + if _pack_is_cached(pack_name): + return _build_response_from_cache(pack_name, pack_name) + + # Also check lower-case variant (Telegram normalizes names) + lower_name = pack_name.lower() + if lower_name != pack_name and _pack_is_cached(lower_name): + return _build_response_from_cache(lower_name, lower_name) + + return None + + def get_sticker_file(pack_name: str, sticker_id: str, fmt: str) -> Path | None: """Return the path to a converted sticker file, or None if not found.""" fmt_dir = settings.downloads_dir / pack_name / fmt