#!/usr/bin/env python3 """ fastmail-receipts: Extract receipt emails from Fastmail via JMAP API Outputs structured JSON, CSV, or human-readable summary of candidate receipt emails with extracted dollar amounts — ready for YNAB reconciliation. Usage: python3 receipts.py --days 30 python3 receipts.py --after 2026-02-01 --before 2026-03-19 python3 receipts.py --days 30 --format csv python3 receipts.py --days 7 --limit 20 --verbose """ import os import sys import json import csv import re import argparse import logging from datetime import datetime, timezone, timedelta from pathlib import Path from html.parser import HTMLParser import requests class HTMLTextExtractor(HTMLParser): """Strip HTML tags and return plain text.""" def __init__(self): super().__init__() self._parts = [] self._skip = False def handle_starttag(self, tag, attrs): if tag in ('script', 'style', 'head'): self._skip = True def handle_endtag(self, tag): if tag in ('script', 'style', 'head'): self._skip = False if tag in ('td', 'th', 'div', 'p', 'br', 'tr', 'li'): self._parts.append(' ') def handle_data(self, data): if not self._skip: self._parts.append(data) def get_text(self): return re.sub(r'\s+', ' ', ''.join(self._parts)).strip() def strip_html(html_text): """Convert HTML to plain text for amount extraction.""" try: extractor = HTMLTextExtractor() extractor.feed(html_text) return extractor.get_text() except Exception: # Fallback: crude tag strip return re.sub(r'<[^>]+>', ' ', html_text) # ───────────────────────────────────────────── # Config # ───────────────────────────────────────────── JMAP_SESSION_URL = "https://api.fastmail.com/jmap/session" JMAP_MAIL_CAP = "urn:ietf:params:jmap:mail" JMAP_CORE_CAP = "urn:ietf:params:jmap:core" ENV_FILE = Path(__file__).parent / ".env" # Subject/body keywords that strongly suggest a receipt RECEIPT_KEYWORDS = [ "receipt", "order confirmation", "order #", "order no.", "invoice", "payment confirmation", "payment receipt", "your purchase", "you paid", "amount charged", "transaction", "billing statement", "charged to", "subtotal", "thank you for your order", "thank you for your purchase", "app store", "itunes", "apple store", "google play", ] # Domain fragments of known receipt senders RECEIPT_DOMAINS = [ "amazon", "apple", "itunes", "paypal", "stripe", "square", "netflix", "spotify", "hulu", "disney", "youtube", "google", "microsoft", "adobe", "dropbox", "github", "digitalocean", "linode", "vultr", "cloudflare", "namecheap", "godaddy", "uber", "lyft", "doordash", "grubhub", "instacart", "target", "walmart", "costco", "chewy", "etsy", "ebay", "bestbuy", "newegg", "bhphotovideo", "adorama", "venmo", "cashapp", "zelle", "ynab", ] # ───────────────────────────────────────────── # Auth & Session # ───────────────────────────────────────────── def load_token(): """Load API token from .env file or FASTMAIL_API_TOKEN env var.""" token = os.environ.get("FASTMAIL_API_TOKEN") if token: return token if ENV_FILE.exists(): for line in ENV_FILE.read_text().splitlines(): line = line.strip() if line.startswith("FASTMAIL_API_TOKEN="): return line.split("=", 1)[1].strip().strip('"').strip("'") raise RuntimeError( f"No API token found. Set FASTMAIL_API_TOKEN env var or create {ENV_FILE} " "with FASTMAIL_API_TOKEN=your_token" ) def get_session(token): """Fetch JMAP session, return (api_url, account_id).""" resp = requests.get( JMAP_SESSION_URL, headers={"Authorization": f"Bearer {token}"}, timeout=15, ) resp.raise_for_status() session = resp.json() api_url = session["apiUrl"] account_id = session.get("primaryAccounts", {}).get(JMAP_MAIL_CAP) if not account_id: for acct_id, acct in session.get("accounts", {}).items(): if acct.get("accountCapabilities", {}).get(JMAP_MAIL_CAP): account_id = acct_id break if not account_id: raise RuntimeError("Could not find a JMAP mail account in session") logging.info(f"JMAP session OK. Account: {account_id}") return api_url, account_id def jmap_call(api_url, token, method_calls): """Execute a JMAP multi-method call and return methodResponses.""" resp = requests.post( api_url, json={ "using": [JMAP_CORE_CAP, JMAP_MAIL_CAP], "methodCalls": method_calls, }, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json", }, timeout=45, ) resp.raise_for_status() return resp.json().get("methodResponses", []) # ───────────────────────────────────────────── # Receipt detection & extraction helpers # ───────────────────────────────────────────── def is_likely_receipt(subject, from_addr, body_snippet): """Heuristic: is this email plausibly a receipt?""" text = f"{subject} {from_addr} {body_snippet}".lower() for kw in RECEIPT_KEYWORDS: if kw in text: return True for domain in RECEIPT_DOMAINS: if domain in text: return True # Dollar sign in subject is a strong signal if re.search(r'\$\s*[\d,]+', subject or ""): return True return False def extract_amounts(text): """Extract all dollar amounts from text. Returns list of clean numeric strings.""" if not text: return [] patterns = [ r'\$\s*[\d,]+(?:\.\d{1,2})?', # $12.34 or $1,234.56 r'USD\s*[\d,]+(?:\.\d{1,2})?', # USD 12.34 r'(?:Total|Amount|Charged|Billed)[:\s]+\$?\s*[\d,]+(?:\.\d{1,2})?', ] raw = [] for pat in patterns: raw.extend(re.findall(pat, text, re.IGNORECASE)) seen, cleaned = set(), [] for amt in raw: numeric = re.sub(r'[^\d.,]', '', amt) if numeric and numeric not in seen: seen.add(numeric) cleaned.append(numeric) return cleaned def pick_primary_amount(amounts): """Heuristic: pick the most likely 'total' from extracted amounts.""" if not amounts: return None floats = [] for amt in amounts: try: floats.append(float(amt.replace(',', ''))) except ValueError: pass if not floats: return None # Return the largest (usually the order total, not a line item) return f"${max(floats):.2f}" def extract_vendor(from_addr, from_name): """Derive a clean vendor name from the sender info.""" if from_name and from_name.strip(): name = re.sub( r'\s+(Inc\.?|LLC\.?|Ltd\.?|Corp\.?|Co\.?)$', '', from_name, flags=re.IGNORECASE ).strip() return name if from_addr: m = re.search(r'@([^@\s>]+)', from_addr) if m: domain = m.group(1) domain = re.sub(r'\.(com|net|org|io|co\.uk|co)$', '', domain, flags=re.IGNORECASE) domain = re.sub( r'^(mail|email|noreply|no-reply|info|billing|payments|receipts|orders|notifications?)\.', '', domain, flags=re.IGNORECASE ) return domain.replace('-', ' ').replace('.', ' ').title() return "Unknown" # ───────────────────────────────────────────── # Main fetch logic # ───────────────────────────────────────────── def fetch_candidate_emails(token, after_dt, before_dt, limit=500): """Query Fastmail JMAP for candidate receipt emails.""" api_url, account_id = get_session(token) after_str = after_dt.strftime("%Y-%m-%dT%H:%M:%SZ") before_str = before_dt.strftime("%Y-%m-%dT%H:%M:%SZ") # Multiple search passes — cast wide net, deduplicate by ID filters = [ {"after": after_str, "before": before_str, "text": "$"}, # any $ in email {"after": after_str, "before": before_str, "subject": "receipt"}, {"after": after_str, "before": before_str, "subject": "order"}, {"after": after_str, "before": before_str, "subject": "invoice"}, {"after": after_str, "before": before_str, "subject": "payment"}, {"after": after_str, "before": before_str, "subject": "confirmation"}, {"after": after_str, "before": before_str, "subject": "charged"}, {"after": after_str, "before": before_str, "subject": "purchase"}, ] all_ids = set() for i, filt in enumerate(filters): responses = jmap_call(api_url, token, [[ "Email/query", { "accountId": account_id, "filter": filt, "sort": [{"property": "receivedAt", "isAscending": False}], "limit": limit, }, f"q{i}" ]]) for name, result, _ in responses: if name == "Email/query": ids = result.get("ids", []) logging.info(f"Filter {i} ('{list(filt.values())[-1]}'): {len(ids)} results") all_ids.update(ids) elif name == "error": logging.warning(f"JMAP error on filter {i}: {result}") logging.info(f"Total unique candidate emails: {len(all_ids)}") if not all_ids: return [] # Fetch details in batches of 50 id_list = list(all_ids) all_emails = [] for start in range(0, len(id_list), 50): batch = id_list[start:start + 50] responses = jmap_call(api_url, token, [[ "Email/get", { "accountId": account_id, "ids": batch, "properties": [ "id", "subject", "receivedAt", "from", "preview", "textBody", "htmlBody", "bodyValues", ], "fetchTextBodyValues": True, "fetchHTMLBodyValues": True, "maxBodyValueBytes": 20480, }, "g1" ]]) for name, result, _ in responses: if name == "Email/get": all_emails.extend(result.get("list", [])) logging.info(f"Fetched details for {len(all_emails)} emails") return all_emails def process_emails(emails): """Turn raw JMAP email objects into structured receipt records.""" records = [] for email in emails: subject = email.get("subject", "") or "" recv_at = email.get("receivedAt", "") from_list = email.get("from") or [] from_addr = from_list[0].get("email", "") if from_list else "" from_name = from_list[0].get("name", "") if from_list else "" # Assemble body text — prefer plain text, fall back to HTML-stripped body_values = email.get("bodyValues") or {} text_body = email.get("textBody") or [] html_body = email.get("htmlBody") or [] body_text = "" for part in text_body: pid = part.get("partId", "") if pid in body_values: body_text += body_values[pid].get("value", "") if not body_text.strip(): # HTML-only email (common for Apple, many retailers) for part in html_body: pid = part.get("partId", "") if pid in body_values: raw_html = body_values[pid].get("value", "") body_text += strip_html(raw_html) preview = email.get("preview", "") or "" body_snippet = (body_text or preview)[:15000] if not is_likely_receipt(subject, from_addr, body_snippet): continue # Search both stripped text AND raw HTML for amounts (catches HTML-only emails # like Apple where prices may be buried in large HTML after CSS blocks) search_text = f"{subject} {body_snippet}" raw_html_text = "" for part in html_body: pid = part.get("partId", "") if pid in body_values: raw_html_text += body_values[pid].get("value", "") if raw_html_text: search_text += " " + raw_html_text[:20000] amounts = extract_amounts(search_text) primary_amount = pick_primary_amount(amounts) vendor = extract_vendor(from_addr, from_name) try: dt = datetime.fromisoformat(recv_at.replace("Z", "+00:00")) date_str = dt.strftime("%Y-%m-%d") time_str = dt.strftime("%H:%M UTC") except (ValueError, AttributeError): date_str = recv_at[:10] if recv_at else "" time_str = "" records.append({ "date": date_str, "time": time_str, "vendor": vendor, "from_name": from_name, "from_email": from_addr, "subject": subject, "amount": primary_amount, "all_amounts": amounts, "preview": preview[:250], "email_id": email.get("id", ""), }) records.sort(key=lambda r: r["date"], reverse=True) return records # ───────────────────────────────────────────── # Output formatters # ───────────────────────────────────────────── def output_json(records): print(json.dumps(records, indent=2, ensure_ascii=False)) def output_csv(records): if not records: print("No records found.") return fields = ["date", "vendor", "amount", "subject", "from_email", "all_amounts", "preview"] writer = csv.DictWriter(sys.stdout, fieldnames=fields, extrasaction="ignore") writer.writeheader() for rec in records: row = dict(rec) row["all_amounts"] = "; ".join(row.get("all_amounts", [])) writer.writerow(row) def output_summary(records): if not records: print("No receipt emails found in the requested time range.") return with_amounts = [r for r in records if r["amount"]] print(f"\n{'═'*72}") print(f" RECEIPT EMAILS — {len(records)} found ({len(with_amounts)} with extracted amounts)") print(f"{'═'*72}") print(f" {'DATE':<12} {'AMOUNT':<12} {'VENDOR':<28} SUBJECT") print(f" {'─'*68}") for rec in records: amt = rec["amount"] or " ?" vendor = (rec["vendor"] or "")[:26] subj = (rec["subject"] or "")[:38] print(f" {rec['date']:<12} {amt:<12} {vendor:<28} {subj}") print(f"{'═'*72}\n") # ───────────────────────────────────────────── # Entry point # ───────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Extract receipt emails from Fastmail via JMAP — YNAB helper" ) parser.add_argument("--days", type=int, default=30, help="Days back to search (default: 30)") parser.add_argument("--after", type=str, help="Start date YYYY-MM-DD (overrides --days)") parser.add_argument("--before", type=str, help="End date YYYY-MM-DD (default: now)") parser.add_argument("--limit", type=int, default=500, help="Max emails per filter pass (default: 500)") parser.add_argument("--format", choices=["json", "csv", "summary"], default="summary", help="Output format (default: summary)") parser.add_argument("--verbose", action="store_true", help="Show JMAP debug info on stderr") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.WARNING, format="%(levelname)s: %(message)s", stream=sys.stderr, ) now = datetime.now(timezone.utc) before_dt = datetime.fromisoformat(args.before).replace(tzinfo=timezone.utc) \ if args.before else now after_dt = datetime.fromisoformat(args.after).replace(tzinfo=timezone.utc) \ if args.after else now - timedelta(days=args.days) try: token = load_token() except RuntimeError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) emails = fetch_candidate_emails(token, after_dt, before_dt, limit=args.limit) records = process_emails(emails) if args.format == "json": output_json(records) elif args.format == "csv": output_csv(records) else: output_summary(records) if __name__ == "__main__": main()