foerderbarometer/input/mail_attachments.py

131 lines
4.3 KiB
Python
Raw Normal View History

import hashlib
import os
import time
import urllib.request
import urllib.parse
import mimetypes
from pathlib import Path
from typing import Iterable, List, Tuple
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
def _ensure_cache_dir() -> Path:
"""
Ensure that the cache directory for attachments exists.
Creates it recursively if it doesn't.
"""
cache_dir = Path(settings.MAIL_ATTACHMENT_CACHE_DIR)
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
def _cached_filename_for(url: str) -> str:
"""
Generate a unique cache filename for the given URL (hash + original suffix if present).
"""
h = hashlib.sha1(url.encode('utf-8')).hexdigest()[:16]
parsed = urllib.parse.urlparse(url)
# path part only (without query/fragment)
name = Path(parsed.path).name # e.g. 'foo.pdf'
suffix = Path(name).suffix # e.g. '.pdf'
return f'{h}{suffix}' if suffix else h
def _is_fresh(path: Path, ttl_seconds: int) -> bool:
"""
Check if the cached file exists and is still fresh within TTL.
"""
try:
age = time.time() - path.stat().st_mtime
return age < ttl_seconds
except FileNotFoundError:
return False
def download_with_cache(url: str, *, timeout: float = 10.0, size_cap_bytes: int = 8 * 1024 * 1024) -> Path:
"""
Download the file from the given URL into the cache directory, or return the cached
file if it's still fresh. Uses a temporary '.part' file and atomic replace.
A simple size cap protects against unexpectedly large downloads.
"""
cache_dir = _ensure_cache_dir()
ttl = int(getattr(settings, 'MAIL_ATTACHMENT_TTL_SECONDS', 86400))
filename = _cached_filename_for(url)
path = cache_dir / filename
if _is_fresh(path, ttl):
return path
tmp_path = path.with_suffix(path.suffix + '.part')
try:
with urllib.request.urlopen(url, timeout=timeout) as resp, open(tmp_path, 'wb') as f:
# Read in chunks up to size_cap_bytes
remaining = size_cap_bytes
chunk_size = 64 * 1024
while True:
chunk = resp.read(min(chunk_size, remaining))
if not chunk:
break
f.write(chunk)
remaining -= len(chunk)
if remaining <= 0:
break
os.replace(tmp_path, path)
return path
except Exception:
# Best-effort cleanup of partial file
try:
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
except Exception:
pass
# Re-raise to let caller decide
raise
def _filename_from_url(url: str) -> str:
"""
Derive a display filename from URL path as a fallback when none provided in settings.
"""
parsed = urllib.parse.urlparse(url)
name = Path(parsed.path).name or 'attachment'
return name
def collect_attachment_paths(kind: str, choice: str) -> List[Tuple[Path, str]]:
"""
Return a list of (path, filename) for attachments based on settings.MAIL_ATTACHMENT_URLS.
Supports both 'url' strings and (url, filename) tuples.
"""
cfg = getattr(settings, 'MAIL_ATTACHMENT_URLS', {})
channel = cfg.get(kind, {})
urls: list = []
urls.extend(channel.get('ALL', []))
urls.extend(channel.get(choice, []))
result: List[Tuple[Path, str]] = []
for item in urls:
if isinstance(item, tuple):
url, filename = item
else:
url, filename = item, _filename_from_url(item)
path = download_with_cache(url)
# Only append if the file exists (download_with_cache raises on error by default)
result.append((path, filename))
return result
def attach_files(message: EmailMultiAlternatives, files: Iterable[Tuple[Path, str]]) -> None:
"""
Attach files to the EmailMultiAlternatives message.
MIME type is guessed from filename; falls back to application/octet-stream.
"""
for path, filename in files:
# Guess MIME type from final filename first; fallback to path suffix
ctype, _ = mimetypes.guess_type(filename)
if not ctype:
ctype, _ = mimetypes.guess_type(str(path))
ctype = ctype or 'application/octet-stream'
with open(path, 'rb') as f:
message.attach(filename, f.read(), ctype)