commit d7b639c21e483fc075165a3a0c7ab0dfbe293c2b Author: Michael Date: Sat Mar 21 17:42:45 2026 +0000 Initial commit diff --git a/fastmail-receipts/README.md b/fastmail-receipts/README.md new file mode 100644 index 0000000..06e92c0 --- /dev/null +++ b/fastmail-receipts/README.md @@ -0,0 +1,40 @@ +# fastmail-receipts + +Extracts receipt/purchase emails from Michael's Fastmail account via JMAP API. +Used for YNAB reconciliation. + +## Quick start + + python3 /data/tools/fastmail-receipts/receipts.py --days 30 + python3 /data/tools/fastmail-receipts/receipts.py --days 30 --format csv + python3 /data/tools/fastmail-receipts/receipts.py --days 30 --format json + +## Options + + --days N Days back to search (default: 30) + --after YYYY-MM-DD Start date + --before YYYY-MM-DD End date (default: now) + --format summary | csv | json (default: summary) + --limit N Max emails per filter pass (default: 500) + --verbose Show JMAP debug on stderr + +## Auth + +API token in /data/tools/fastmail-receipts/.env (chmod 600, root only) +Token: fmu1-ad294022-... (see .env) +Fastmail JMAP account: uad294022 + +## Output fields (JSON) + +date, time, vendor, from_name, from_email, subject, amount (best guess), +all_amounts (all found), preview, email_id + +## Notes + +- Apple receipts: HTML-only, amounts extracted from raw HTML +- Vanguard/SoFi entries are brokerage notifications, not purchase receipts +- Forwarded threads (Fwd:/Re:) can appear — filter by from_email if needed +- Duplicate shipping updates: deduplicate by vendor+amount+date window +- $ JMAP text search returns 0 (Fastmail doesn't index $) — keyword filters used instead + +## Last tested: 2026-03-19 — 102 receipts in 30 days, 70 with amounts diff --git a/fastmail-receipts/receipts.py b/fastmail-receipts/receipts.py new file mode 100755 index 0000000..124eae5 --- /dev/null +++ b/fastmail-receipts/receipts.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +""" +fastmail-receipts: Extract receipt emails from Fastmail via JMAP API + +Outputs structured JSON, CSV, or human-readable summary of candidate receipt +emails with extracted dollar amounts — ready for YNAB reconciliation. + +Usage: + python3 receipts.py --days 30 + python3 receipts.py --after 2026-02-01 --before 2026-03-19 + python3 receipts.py --days 30 --format csv + python3 receipts.py --days 7 --limit 20 --verbose +""" + +import os +import sys +import json +import csv +import re +import argparse +import logging +from datetime import datetime, timezone, timedelta +from pathlib import Path +from html.parser import HTMLParser + +import requests + + +class HTMLTextExtractor(HTMLParser): + """Strip HTML tags and return plain text.""" + def __init__(self): + super().__init__() + self._parts = [] + self._skip = False + + def handle_starttag(self, tag, attrs): + if tag in ('script', 'style', 'head'): + self._skip = True + + def handle_endtag(self, tag): + if tag in ('script', 'style', 'head'): + self._skip = False + if tag in ('td', 'th', 'div', 'p', 'br', 'tr', 'li'): + self._parts.append(' ') + + def handle_data(self, data): + if not self._skip: + self._parts.append(data) + + def get_text(self): + return re.sub(r'\s+', ' ', ''.join(self._parts)).strip() + + +def strip_html(html_text): + """Convert HTML to plain text for amount extraction.""" + try: + extractor = HTMLTextExtractor() + extractor.feed(html_text) + return extractor.get_text() + except Exception: + # Fallback: crude tag strip + return re.sub(r'<[^>]+>', ' ', html_text) + +# ───────────────────────────────────────────── +# Config +# ───────────────────────────────────────────── + +JMAP_SESSION_URL = "https://api.fastmail.com/jmap/session" +JMAP_MAIL_CAP = "urn:ietf:params:jmap:mail" +JMAP_CORE_CAP = "urn:ietf:params:jmap:core" +ENV_FILE = Path(__file__).parent / ".env" + +# Subject/body keywords that strongly suggest a receipt +RECEIPT_KEYWORDS = [ + "receipt", "order confirmation", "order #", "order no.", + "invoice", "payment confirmation", "payment receipt", + "your purchase", "you paid", "amount charged", "transaction", + "billing statement", "charged to", "subtotal", + "thank you for your order", "thank you for your purchase", + "app store", "itunes", "apple store", "google play", +] + +# Domain fragments of known receipt senders +RECEIPT_DOMAINS = [ + "amazon", "apple", "itunes", "paypal", "stripe", "square", + "netflix", "spotify", "hulu", "disney", "youtube", "google", + "microsoft", "adobe", "dropbox", "github", "digitalocean", + "linode", "vultr", "cloudflare", "namecheap", "godaddy", + "uber", "lyft", "doordash", "grubhub", "instacart", + "target", "walmart", "costco", "chewy", "etsy", "ebay", + "bestbuy", "newegg", "bhphotovideo", "adorama", + "venmo", "cashapp", "zelle", "ynab", +] + + +# ───────────────────────────────────────────── +# Auth & Session +# ───────────────────────────────────────────── + +def load_token(): + """Load API token from .env file or FASTMAIL_API_TOKEN env var.""" + token = os.environ.get("FASTMAIL_API_TOKEN") + if token: + return token + if ENV_FILE.exists(): + for line in ENV_FILE.read_text().splitlines(): + line = line.strip() + if line.startswith("FASTMAIL_API_TOKEN="): + return line.split("=", 1)[1].strip().strip('"').strip("'") + raise RuntimeError( + f"No API token found. Set FASTMAIL_API_TOKEN env var or create {ENV_FILE} " + "with FASTMAIL_API_TOKEN=your_token" + ) + + +def get_session(token): + """Fetch JMAP session, return (api_url, account_id).""" + resp = requests.get( + JMAP_SESSION_URL, + headers={"Authorization": f"Bearer {token}"}, + timeout=15, + ) + resp.raise_for_status() + session = resp.json() + api_url = session["apiUrl"] + + account_id = session.get("primaryAccounts", {}).get(JMAP_MAIL_CAP) + if not account_id: + for acct_id, acct in session.get("accounts", {}).items(): + if acct.get("accountCapabilities", {}).get(JMAP_MAIL_CAP): + account_id = acct_id + break + if not account_id: + raise RuntimeError("Could not find a JMAP mail account in session") + + logging.info(f"JMAP session OK. Account: {account_id}") + return api_url, account_id + + +def jmap_call(api_url, token, method_calls): + """Execute a JMAP multi-method call and return methodResponses.""" + resp = requests.post( + api_url, + json={ + "using": [JMAP_CORE_CAP, JMAP_MAIL_CAP], + "methodCalls": method_calls, + }, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + }, + timeout=45, + ) + resp.raise_for_status() + return resp.json().get("methodResponses", []) + + +# ───────────────────────────────────────────── +# Receipt detection & extraction helpers +# ───────────────────────────────────────────── + +def is_likely_receipt(subject, from_addr, body_snippet): + """Heuristic: is this email plausibly a receipt?""" + text = f"{subject} {from_addr} {body_snippet}".lower() + for kw in RECEIPT_KEYWORDS: + if kw in text: + return True + for domain in RECEIPT_DOMAINS: + if domain in text: + return True + # Dollar sign in subject is a strong signal + if re.search(r'\$\s*[\d,]+', subject or ""): + return True + return False + + +def extract_amounts(text): + """Extract all dollar amounts from text. Returns list of clean numeric strings.""" + if not text: + return [] + patterns = [ + r'\$\s*[\d,]+(?:\.\d{1,2})?', # $12.34 or $1,234.56 + r'USD\s*[\d,]+(?:\.\d{1,2})?', # USD 12.34 + r'(?:Total|Amount|Charged|Billed)[:\s]+\$?\s*[\d,]+(?:\.\d{1,2})?', + ] + raw = [] + for pat in patterns: + raw.extend(re.findall(pat, text, re.IGNORECASE)) + + seen, cleaned = set(), [] + for amt in raw: + numeric = re.sub(r'[^\d.,]', '', amt) + if numeric and numeric not in seen: + seen.add(numeric) + cleaned.append(numeric) + return cleaned + + +def pick_primary_amount(amounts): + """Heuristic: pick the most likely 'total' from extracted amounts.""" + if not amounts: + return None + floats = [] + for amt in amounts: + try: + floats.append(float(amt.replace(',', ''))) + except ValueError: + pass + if not floats: + return None + # Return the largest (usually the order total, not a line item) + return f"${max(floats):.2f}" + + +def extract_vendor(from_addr, from_name): + """Derive a clean vendor name from the sender info.""" + if from_name and from_name.strip(): + name = re.sub( + r'\s+(Inc\.?|LLC\.?|Ltd\.?|Corp\.?|Co\.?)$', '', + from_name, flags=re.IGNORECASE + ).strip() + return name + if from_addr: + m = re.search(r'@([^@\s>]+)', from_addr) + if m: + domain = m.group(1) + domain = re.sub(r'\.(com|net|org|io|co\.uk|co)$', '', domain, flags=re.IGNORECASE) + domain = re.sub( + r'^(mail|email|noreply|no-reply|info|billing|payments|receipts|orders|notifications?)\.', + '', domain, flags=re.IGNORECASE + ) + return domain.replace('-', ' ').replace('.', ' ').title() + return "Unknown" + + +# ───────────────────────────────────────────── +# Main fetch logic +# ───────────────────────────────────────────── + +def fetch_candidate_emails(token, after_dt, before_dt, limit=500): + """Query Fastmail JMAP for candidate receipt emails.""" + api_url, account_id = get_session(token) + + after_str = after_dt.strftime("%Y-%m-%dT%H:%M:%SZ") + before_str = before_dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + # Multiple search passes — cast wide net, deduplicate by ID + filters = [ + {"after": after_str, "before": before_str, "text": "$"}, # any $ in email + {"after": after_str, "before": before_str, "subject": "receipt"}, + {"after": after_str, "before": before_str, "subject": "order"}, + {"after": after_str, "before": before_str, "subject": "invoice"}, + {"after": after_str, "before": before_str, "subject": "payment"}, + {"after": after_str, "before": before_str, "subject": "confirmation"}, + {"after": after_str, "before": before_str, "subject": "charged"}, + {"after": after_str, "before": before_str, "subject": "purchase"}, + ] + + all_ids = set() + for i, filt in enumerate(filters): + responses = jmap_call(api_url, token, [[ + "Email/query", { + "accountId": account_id, + "filter": filt, + "sort": [{"property": "receivedAt", "isAscending": False}], + "limit": limit, + }, f"q{i}" + ]]) + for name, result, _ in responses: + if name == "Email/query": + ids = result.get("ids", []) + logging.info(f"Filter {i} ('{list(filt.values())[-1]}'): {len(ids)} results") + all_ids.update(ids) + elif name == "error": + logging.warning(f"JMAP error on filter {i}: {result}") + + logging.info(f"Total unique candidate emails: {len(all_ids)}") + if not all_ids: + return [] + + # Fetch details in batches of 50 + id_list = list(all_ids) + all_emails = [] + for start in range(0, len(id_list), 50): + batch = id_list[start:start + 50] + responses = jmap_call(api_url, token, [[ + "Email/get", { + "accountId": account_id, + "ids": batch, + "properties": [ + "id", "subject", "receivedAt", "from", + "preview", "textBody", "htmlBody", "bodyValues", + ], + "fetchTextBodyValues": True, + "fetchHTMLBodyValues": True, + "maxBodyValueBytes": 20480, + }, "g1" + ]]) + for name, result, _ in responses: + if name == "Email/get": + all_emails.extend(result.get("list", [])) + + logging.info(f"Fetched details for {len(all_emails)} emails") + return all_emails + + +def process_emails(emails): + """Turn raw JMAP email objects into structured receipt records.""" + records = [] + + for email in emails: + subject = email.get("subject", "") or "" + recv_at = email.get("receivedAt", "") + + from_list = email.get("from") or [] + from_addr = from_list[0].get("email", "") if from_list else "" + from_name = from_list[0].get("name", "") if from_list else "" + + # Assemble body text — prefer plain text, fall back to HTML-stripped + body_values = email.get("bodyValues") or {} + text_body = email.get("textBody") or [] + html_body = email.get("htmlBody") or [] + body_text = "" + + for part in text_body: + pid = part.get("partId", "") + if pid in body_values: + body_text += body_values[pid].get("value", "") + + if not body_text.strip(): + # HTML-only email (common for Apple, many retailers) + for part in html_body: + pid = part.get("partId", "") + if pid in body_values: + raw_html = body_values[pid].get("value", "") + body_text += strip_html(raw_html) + + preview = email.get("preview", "") or "" + body_snippet = (body_text or preview)[:15000] + + if not is_likely_receipt(subject, from_addr, body_snippet): + continue + + # Search both stripped text AND raw HTML for amounts (catches HTML-only emails + # like Apple where prices may be buried in large HTML after CSS blocks) + search_text = f"{subject} {body_snippet}" + raw_html_text = "" + for part in html_body: + pid = part.get("partId", "") + if pid in body_values: + raw_html_text += body_values[pid].get("value", "") + if raw_html_text: + search_text += " " + raw_html_text[:20000] + + amounts = extract_amounts(search_text) + primary_amount = pick_primary_amount(amounts) + vendor = extract_vendor(from_addr, from_name) + + try: + dt = datetime.fromisoformat(recv_at.replace("Z", "+00:00")) + date_str = dt.strftime("%Y-%m-%d") + time_str = dt.strftime("%H:%M UTC") + except (ValueError, AttributeError): + date_str = recv_at[:10] if recv_at else "" + time_str = "" + + records.append({ + "date": date_str, + "time": time_str, + "vendor": vendor, + "from_name": from_name, + "from_email": from_addr, + "subject": subject, + "amount": primary_amount, + "all_amounts": amounts, + "preview": preview[:250], + "email_id": email.get("id", ""), + }) + + records.sort(key=lambda r: r["date"], reverse=True) + return records + + +# ───────────────────────────────────────────── +# Output formatters +# ───────────────────────────────────────────── + +def output_json(records): + print(json.dumps(records, indent=2, ensure_ascii=False)) + + +def output_csv(records): + if not records: + print("No records found.") + return + fields = ["date", "vendor", "amount", "subject", "from_email", "all_amounts", "preview"] + writer = csv.DictWriter(sys.stdout, fieldnames=fields, extrasaction="ignore") + writer.writeheader() + for rec in records: + row = dict(rec) + row["all_amounts"] = "; ".join(row.get("all_amounts", [])) + writer.writerow(row) + + +def output_summary(records): + if not records: + print("No receipt emails found in the requested time range.") + return + + with_amounts = [r for r in records if r["amount"]] + print(f"\n{'═'*72}") + print(f" RECEIPT EMAILS — {len(records)} found ({len(with_amounts)} with extracted amounts)") + print(f"{'═'*72}") + print(f" {'DATE':<12} {'AMOUNT':<12} {'VENDOR':<28} SUBJECT") + print(f" {'─'*68}") + for rec in records: + amt = rec["amount"] or " ?" + vendor = (rec["vendor"] or "")[:26] + subj = (rec["subject"] or "")[:38] + print(f" {rec['date']:<12} {amt:<12} {vendor:<28} {subj}") + print(f"{'═'*72}\n") + + +# ───────────────────────────────────────────── +# Entry point +# ───────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + description="Extract receipt emails from Fastmail via JMAP — YNAB helper" + ) + parser.add_argument("--days", type=int, default=30, + help="Days back to search (default: 30)") + parser.add_argument("--after", type=str, + help="Start date YYYY-MM-DD (overrides --days)") + parser.add_argument("--before", type=str, + help="End date YYYY-MM-DD (default: now)") + parser.add_argument("--limit", type=int, default=500, + help="Max emails per filter pass (default: 500)") + parser.add_argument("--format", choices=["json", "csv", "summary"], + default="summary", + help="Output format (default: summary)") + parser.add_argument("--verbose", action="store_true", + help="Show JMAP debug info on stderr") + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.WARNING, + format="%(levelname)s: %(message)s", + stream=sys.stderr, + ) + + now = datetime.now(timezone.utc) + before_dt = datetime.fromisoformat(args.before).replace(tzinfo=timezone.utc) \ + if args.before else now + after_dt = datetime.fromisoformat(args.after).replace(tzinfo=timezone.utc) \ + if args.after else now - timedelta(days=args.days) + + try: + token = load_token() + except RuntimeError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + emails = fetch_candidate_emails(token, after_dt, before_dt, limit=args.limit) + records = process_emails(emails) + + if args.format == "json": output_json(records) + elif args.format == "csv": output_csv(records) + else: output_summary(records) + + +if __name__ == "__main__": + main()