improved attachment download code

This commit is contained in:
Oliver Zander 2025-10-17 17:22:52 +02:00
parent 7fcde34897
commit 1c98092473
2 changed files with 40 additions and 91 deletions

View File

@ -1,46 +1,35 @@
import hashlib
import os import os
import posixpath
import time import time
import urllib.request
import urllib.parse
import mimetypes import mimetypes
from contextlib import suppress from os import PathLike
from pathlib import Path from pathlib import Path
from typing import Iterable, List, Tuple from urllib.parse import urlparse
from urllib.request import urlretrieve
from django.conf import settings from django.conf import settings
from django.core.mail import EmailMultiAlternatives from django.core.mail import EmailMultiAlternatives
from foerderbarometer.constants import * from foerderbarometer.constants import *
PathList = list[Path]
def _ensure_cache_dir() -> Path:
def ensure_dir(directory: PathLike) -> Path:
""" """
Ensure that the cache directory for attachments exists. Ensure that the given directory exists.
Creates it recursively if it doesn't. Creates it recursively if it doesn't.
""" """
cache_dir = Path(settings.MAIL_ATTACHMENT_CACHE_DIR) directory = Path(directory)
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir directory.mkdir(parents=True, exist_ok=True)
return directory
def _cached_filename_for(url: str) -> str: def is_fresh(path: Path, ttl_seconds: int) -> bool:
"""
Generate a unique cache filename for the given URL (hash + original suffix if present).
"""
h = hashlib.sha1(url.encode('utf-8')).hexdigest()[:16]
parsed = urllib.parse.urlparse(url)
# path part only (without query/fragment)
name = Path(parsed.path).name # e.g. 'foo.pdf'
suffix = Path(name).suffix # e.g. '.pdf'
return f'{h}{suffix}' if suffix else h
def _is_fresh(path: Path, ttl_seconds: int) -> bool:
""" """
Check if the cached file exists and is still fresh within TTL. Check if the cached file exists and is still fresh within TTL.
""" """
@ -53,93 +42,53 @@ def _is_fresh(path: Path, ttl_seconds: int) -> bool:
return time.time() - mtime < ttl_seconds return time.time() - mtime < ttl_seconds
def download_with_cache(url: str, *, timeout: float = 10.0, chunk_size: int = 64 * 1024, size_cap_bytes: int = 8 * 1024 * 1024) -> Path: def get_attachment(url: str) -> Path:
""" filepath = urlparse(url).path
Download the file from the given URL into the cache directory, or return the cached filename = posixpath.basename(filepath)
file if it's still fresh. Uses a temporary '.part' file and atomic replace. destination = ensure_dir(settings.MAIL_ATTACHMENT_CACHE_DIR) / filename
A simple size cap protects against unexpectedly large downloads.
"""
cache_dir = _ensure_cache_dir() if is_fresh(destination, settings.MAIL_ATTACHMENT_TTL_SECONDS):
ttl = settings.MAIL_ATTACHMENT_TTL_SECONDS return destination
filename = _cached_filename_for(url)
path = cache_dir / filename
if _is_fresh(path, ttl): return download_attachment(url, destination)
return path
tmp_path = path.with_suffix(path.suffix + '.part')
def download_attachment(url: str, destination: Path) -> Path:
filepath = destination.with_suffix('.tmp')
try: try:
with urllib.request.urlopen(url, timeout=timeout) as resp, open(tmp_path, 'wb') as f: urlretrieve(url, filepath)
# Read in chunks up to size_cap_bytes os.replace(filepath, destination)
remaining = size_cap_bytes finally:
while True: filepath.unlink(missing_ok=True)
chunk = resp.read(min(chunk_size, remaining))
if not chunk:
break
f.write(chunk)
remaining -= len(chunk)
if remaining <= 0:
break
os.replace(tmp_path, path)
return path
except Exception as exc:
# Best-effort cleanup of partial file
with suppress(Exception):
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
# Re-raise to let caller decide return destination
raise exc
def get_filename_from_url(url: str) -> str: def collect_attachment_paths(recipient: str, type_code: str) -> PathList:
"""
Derive a display filename from URL path.
"""
parsed = urllib.parse.urlparse(url)
name = Path(parsed.path).name or 'attachment'
return name
def collect_attachment_paths(recipient: str, type_code: str) -> List[Tuple[Path, str]]:
"""
Return a list of (path, filename) for attachments based on settings.MAIL_ATTACHMENT_URLS.
"""
assert recipient in RECIPIENTS assert recipient in RECIPIENTS
assert type_code in TYPES assert type_code in TYPES
config = settings.MAIL_ATTACHMENT_URLS[recipient] config = settings.MAIL_ATTACHMENT_URLS[recipient]
urls = [*config[TYPE_ALL], *config.get(type_code, [])] urls = [*config[TYPE_ALL], *config.get(type_code, [])]
return [ return [get_attachment(url) for url in urls]
(download_with_cache(url), get_filename_from_url(url))
for url in urls
]
def get_mime_type(filename: str, path: Path): def get_mime_type(path: Path) -> str:
for value in filename, path: mime_type, encoding = mimetypes.guess_type(path)
mime_type, _ = mimetypes.guess_type(value)
if mime_type: return mime_type or 'application/octet-stream'
return mime_type
return 'application/octet-stream'
def attach_files(message: EmailMultiAlternatives, files: Iterable[Tuple[Path, str]]) -> None: def attach_files(message: EmailMultiAlternatives, files: list[Path]):
""" """
Attach files to the EmailMultiAlternatives message. Attach files to the EmailMultiAlternatives message.
MIME type is guessed from filename or path; falls back to application/octet-stream. MIME type is guessed from path; falls back to application/octet-stream.
""" """
for path, filename in files: for path in files:
mime_type = get_mime_type(filename, path) mime_type = get_mime_type(path)
with open(path, 'rb') as f: with open(path, 'rb') as fp:
message.attach(filename, f.read(), mime_type) message.attach(path.name, fp.read(), mime_type)

View File

@ -289,7 +289,7 @@ class ApplicationView(FormView):
def send_email(self, kind, template_name, subject, recipient, context, *, fail_silently=False): def send_email(self, kind, template_name, subject, recipient, context, *, fail_silently=False):
email = build_email(template_name, context, subject, recipient) email = build_email(template_name, context, subject, recipient)
applicant_files = collect_attachment_paths(kind=kind, choice=self.type_code) applicant_files = collect_attachment_paths(kind, self.type_code)
attach_files(email, applicant_files) attach_files(email, applicant_files)