From 1c980924732ebd7b62d7deb4554547d55d26c381 Mon Sep 17 00:00:00 2001 From: Oliver Zander Date: Fri, 17 Oct 2025 17:22:52 +0200 Subject: [PATCH] improved attachment download code --- input/utils/mail/attachments.py | 129 ++++++++++---------------------- input/views.py | 2 +- 2 files changed, 40 insertions(+), 91 deletions(-) diff --git a/input/utils/mail/attachments.py b/input/utils/mail/attachments.py index 4873365..27d9cf5 100644 --- a/input/utils/mail/attachments.py +++ b/input/utils/mail/attachments.py @@ -1,46 +1,35 @@ -import hashlib import os +import posixpath import time -import urllib.request -import urllib.parse import mimetypes -from contextlib import suppress +from os import PathLike from pathlib import Path -from typing import Iterable, List, Tuple +from urllib.parse import urlparse +from urllib.request import urlretrieve from django.conf import settings from django.core.mail import EmailMultiAlternatives from foerderbarometer.constants import * +PathList = list[Path] -def _ensure_cache_dir() -> Path: + +def ensure_dir(directory: PathLike) -> Path: """ - Ensure that the cache directory for attachments exists. + Ensure that the given directory exists. Creates it recursively if it doesn't. """ - cache_dir = Path(settings.MAIL_ATTACHMENT_CACHE_DIR) - cache_dir.mkdir(parents=True, exist_ok=True) + directory = Path(directory) - return cache_dir + directory.mkdir(parents=True, exist_ok=True) + + return directory -def _cached_filename_for(url: str) -> str: - """ - Generate a unique cache filename for the given URL (hash + original suffix if present). - """ - h = hashlib.sha1(url.encode('utf-8')).hexdigest()[:16] - parsed = urllib.parse.urlparse(url) - # path part only (without query/fragment) - name = Path(parsed.path).name # e.g. 'foo.pdf' - suffix = Path(name).suffix # e.g. '.pdf' - - return f'{h}{suffix}' if suffix else h - - -def _is_fresh(path: Path, ttl_seconds: int) -> bool: +def is_fresh(path: Path, ttl_seconds: int) -> bool: """ Check if the cached file exists and is still fresh within TTL. """ @@ -53,93 +42,53 @@ def _is_fresh(path: Path, ttl_seconds: int) -> bool: return time.time() - mtime < ttl_seconds -def download_with_cache(url: str, *, timeout: float = 10.0, chunk_size: int = 64 * 1024, size_cap_bytes: int = 8 * 1024 * 1024) -> Path: - """ - Download the file from the given URL into the cache directory, or return the cached - file if it's still fresh. Uses a temporary '.part' file and atomic replace. - A simple size cap protects against unexpectedly large downloads. - """ +def get_attachment(url: str) -> Path: + filepath = urlparse(url).path + filename = posixpath.basename(filepath) + destination = ensure_dir(settings.MAIL_ATTACHMENT_CACHE_DIR) / filename - cache_dir = _ensure_cache_dir() - ttl = settings.MAIL_ATTACHMENT_TTL_SECONDS - filename = _cached_filename_for(url) - path = cache_dir / filename + if is_fresh(destination, settings.MAIL_ATTACHMENT_TTL_SECONDS): + return destination - if _is_fresh(path, ttl): - return path + return download_attachment(url, destination) - tmp_path = path.with_suffix(path.suffix + '.part') + +def download_attachment(url: str, destination: Path) -> Path: + filepath = destination.with_suffix('.tmp') try: - with urllib.request.urlopen(url, timeout=timeout) as resp, open(tmp_path, 'wb') as f: - # Read in chunks up to size_cap_bytes - remaining = size_cap_bytes - while True: - chunk = resp.read(min(chunk_size, remaining)) - if not chunk: - break - f.write(chunk) - remaining -= len(chunk) - if remaining <= 0: - break - os.replace(tmp_path, path) - return path - except Exception as exc: - # Best-effort cleanup of partial file - with suppress(Exception): - if tmp_path.exists(): - tmp_path.unlink(missing_ok=True) + urlretrieve(url, filepath) + os.replace(filepath, destination) + finally: + filepath.unlink(missing_ok=True) - # Re-raise to let caller decide - raise exc + return destination -def get_filename_from_url(url: str) -> str: - """ - Derive a display filename from URL path. - """ - - parsed = urllib.parse.urlparse(url) - name = Path(parsed.path).name or 'attachment' - - return name - - -def collect_attachment_paths(recipient: str, type_code: str) -> List[Tuple[Path, str]]: - """ - Return a list of (path, filename) for attachments based on settings.MAIL_ATTACHMENT_URLS. - """ - +def collect_attachment_paths(recipient: str, type_code: str) -> PathList: assert recipient in RECIPIENTS assert type_code in TYPES config = settings.MAIL_ATTACHMENT_URLS[recipient] urls = [*config[TYPE_ALL], *config.get(type_code, [])] - return [ - (download_with_cache(url), get_filename_from_url(url)) - for url in urls - ] + return [get_attachment(url) for url in urls] -def get_mime_type(filename: str, path: Path): - for value in filename, path: - mime_type, _ = mimetypes.guess_type(value) +def get_mime_type(path: Path) -> str: + mime_type, encoding = mimetypes.guess_type(path) - if mime_type: - return mime_type - - return 'application/octet-stream' + return mime_type or 'application/octet-stream' -def attach_files(message: EmailMultiAlternatives, files: Iterable[Tuple[Path, str]]) -> None: +def attach_files(message: EmailMultiAlternatives, files: list[Path]): """ Attach files to the EmailMultiAlternatives message. - MIME type is guessed from filename or path; falls back to application/octet-stream. + MIME type is guessed from path; falls back to application/octet-stream. """ - for path, filename in files: - mime_type = get_mime_type(filename, path) + for path in files: + mime_type = get_mime_type(path) - with open(path, 'rb') as f: - message.attach(filename, f.read(), mime_type) + with open(path, 'rb') as fp: + message.attach(path.name, fp.read(), mime_type) diff --git a/input/views.py b/input/views.py index e56264e..d92b2f9 100755 --- a/input/views.py +++ b/input/views.py @@ -289,7 +289,7 @@ class ApplicationView(FormView): def send_email(self, kind, template_name, subject, recipient, context, *, fail_silently=False): email = build_email(template_name, context, subject, recipient) - applicant_files = collect_attachment_paths(kind=kind, choice=self.type_code) + applicant_files = collect_attachment_paths(kind, self.type_code) attach_files(email, applicant_files)