import hashlib import os import time import urllib.request import urllib.parse import mimetypes from contextlib import suppress from pathlib import Path from typing import Iterable, List, Tuple from django.conf import settings from django.core.mail import EmailMultiAlternatives from foerderbarometer.constants import * def _ensure_cache_dir() -> Path: """ Ensure that the cache directory for attachments exists. Creates it recursively if it doesn't. """ cache_dir = Path(settings.MAIL_ATTACHMENT_CACHE_DIR) cache_dir.mkdir(parents=True, exist_ok=True) return cache_dir def _cached_filename_for(url: str) -> str: """ Generate a unique cache filename for the given URL (hash + original suffix if present). """ h = hashlib.sha1(url.encode('utf-8')).hexdigest()[:16] parsed = urllib.parse.urlparse(url) # path part only (without query/fragment) name = Path(parsed.path).name # e.g. 'foo.pdf' suffix = Path(name).suffix # e.g. '.pdf' return f'{h}{suffix}' if suffix else h def _is_fresh(path: Path, ttl_seconds: int) -> bool: """ Check if the cached file exists and is still fresh within TTL. """ try: mtime = path.stat().st_mtime except FileNotFoundError: return False else: return time.time() - mtime < ttl_seconds def download_with_cache(url: str, *, timeout: float = 10.0, chunk_size: int = 64 * 1024, size_cap_bytes: int = 8 * 1024 * 1024) -> Path: """ Download the file from the given URL into the cache directory, or return the cached file if it's still fresh. Uses a temporary '.part' file and atomic replace. A simple size cap protects against unexpectedly large downloads. """ cache_dir = _ensure_cache_dir() ttl = settings.MAIL_ATTACHMENT_TTL_SECONDS filename = _cached_filename_for(url) path = cache_dir / filename if _is_fresh(path, ttl): return path tmp_path = path.with_suffix(path.suffix + '.part') try: with urllib.request.urlopen(url, timeout=timeout) as resp, open(tmp_path, 'wb') as f: # Read in chunks up to size_cap_bytes remaining = size_cap_bytes while True: chunk = resp.read(min(chunk_size, remaining)) if not chunk: break f.write(chunk) remaining -= len(chunk) if remaining <= 0: break os.replace(tmp_path, path) return path except Exception as exc: # Best-effort cleanup of partial file with suppress(Exception): if tmp_path.exists(): tmp_path.unlink(missing_ok=True) # Re-raise to let caller decide raise exc def get_filename_from_url(url: str) -> str: """ Derive a display filename from URL path. """ parsed = urllib.parse.urlparse(url) name = Path(parsed.path).name or 'attachment' return name def collect_attachment_paths(recipient: str, type_code: str) -> List[Tuple[Path, str]]: """ Return a list of (path, filename) for attachments based on settings.MAIL_ATTACHMENT_URLS. """ assert recipient in RECIPIENTS assert type_code in TYPES config = settings.MAIL_ATTACHMENT_URLS[recipient] urls = [*config[TYPE_ALL], *config.get(type_code, [])] return [ (download_with_cache(url), get_filename_from_url(url)) for url in urls ] def get_mime_type(filename: str, path: Path): for value in filename, path: mime_type, _ = mimetypes.guess_type(value) if mime_type: return mime_type return 'application/octet-stream' def attach_files(message: EmailMultiAlternatives, files: Iterable[Tuple[Path, str]]) -> None: """ Attach files to the EmailMultiAlternatives message. MIME type is guessed from filename or path; falls back to application/octet-stream. """ for path, filename in files: mime_type = get_mime_type(filename, path) with open(path, 'rb') as f: message.attach(filename, f.read(), mime_type)