Initial commit

This commit is contained in:
Michael Dwyer 2026-03-21 17:42:45 +00:00
commit d7b639c21e
2 changed files with 514 additions and 0 deletions

View File

@ -0,0 +1,40 @@
# fastmail-receipts
Extracts receipt/purchase emails from Michael's Fastmail account via JMAP API.
Used for YNAB reconciliation.
## Quick start
python3 /data/tools/fastmail-receipts/receipts.py --days 30
python3 /data/tools/fastmail-receipts/receipts.py --days 30 --format csv
python3 /data/tools/fastmail-receipts/receipts.py --days 30 --format json
## Options
--days N Days back to search (default: 30)
--after YYYY-MM-DD Start date
--before YYYY-MM-DD End date (default: now)
--format summary | csv | json (default: summary)
--limit N Max emails per filter pass (default: 500)
--verbose Show JMAP debug on stderr
## Auth
API token in /data/tools/fastmail-receipts/.env (chmod 600, root only)
Token: fmu1-ad294022-... (see .env)
Fastmail JMAP account: uad294022
## Output fields (JSON)
date, time, vendor, from_name, from_email, subject, amount (best guess),
all_amounts (all found), preview, email_id
## Notes
- Apple receipts: HTML-only, amounts extracted from raw HTML
- Vanguard/SoFi entries are brokerage notifications, not purchase receipts
- Forwarded threads (Fwd:/Re:) can appear — filter by from_email if needed
- Duplicate shipping updates: deduplicate by vendor+amount+date window
- $ JMAP text search returns 0 (Fastmail doesn't index $) — keyword filters used instead
## Last tested: 2026-03-19 — 102 receipts in 30 days, 70 with amounts

474
fastmail-receipts/receipts.py Executable file
View File

@ -0,0 +1,474 @@
#!/usr/bin/env python3
"""
fastmail-receipts: Extract receipt emails from Fastmail via JMAP API
Outputs structured JSON, CSV, or human-readable summary of candidate receipt
emails with extracted dollar amounts ready for YNAB reconciliation.
Usage:
python3 receipts.py --days 30
python3 receipts.py --after 2026-02-01 --before 2026-03-19
python3 receipts.py --days 30 --format csv
python3 receipts.py --days 7 --limit 20 --verbose
"""
import os
import sys
import json
import csv
import re
import argparse
import logging
from datetime import datetime, timezone, timedelta
from pathlib import Path
from html.parser import HTMLParser
import requests
class HTMLTextExtractor(HTMLParser):
"""Strip HTML tags and return plain text."""
def __init__(self):
super().__init__()
self._parts = []
self._skip = False
def handle_starttag(self, tag, attrs):
if tag in ('script', 'style', 'head'):
self._skip = True
def handle_endtag(self, tag):
if tag in ('script', 'style', 'head'):
self._skip = False
if tag in ('td', 'th', 'div', 'p', 'br', 'tr', 'li'):
self._parts.append(' ')
def handle_data(self, data):
if not self._skip:
self._parts.append(data)
def get_text(self):
return re.sub(r'\s+', ' ', ''.join(self._parts)).strip()
def strip_html(html_text):
"""Convert HTML to plain text for amount extraction."""
try:
extractor = HTMLTextExtractor()
extractor.feed(html_text)
return extractor.get_text()
except Exception:
# Fallback: crude tag strip
return re.sub(r'<[^>]+>', ' ', html_text)
# ─────────────────────────────────────────────
# Config
# ─────────────────────────────────────────────
JMAP_SESSION_URL = "https://api.fastmail.com/jmap/session"
JMAP_MAIL_CAP = "urn:ietf:params:jmap:mail"
JMAP_CORE_CAP = "urn:ietf:params:jmap:core"
ENV_FILE = Path(__file__).parent / ".env"
# Subject/body keywords that strongly suggest a receipt
RECEIPT_KEYWORDS = [
"receipt", "order confirmation", "order #", "order no.",
"invoice", "payment confirmation", "payment receipt",
"your purchase", "you paid", "amount charged", "transaction",
"billing statement", "charged to", "subtotal",
"thank you for your order", "thank you for your purchase",
"app store", "itunes", "apple store", "google play",
]
# Domain fragments of known receipt senders
RECEIPT_DOMAINS = [
"amazon", "apple", "itunes", "paypal", "stripe", "square",
"netflix", "spotify", "hulu", "disney", "youtube", "google",
"microsoft", "adobe", "dropbox", "github", "digitalocean",
"linode", "vultr", "cloudflare", "namecheap", "godaddy",
"uber", "lyft", "doordash", "grubhub", "instacart",
"target", "walmart", "costco", "chewy", "etsy", "ebay",
"bestbuy", "newegg", "bhphotovideo", "adorama",
"venmo", "cashapp", "zelle", "ynab",
]
# ─────────────────────────────────────────────
# Auth & Session
# ─────────────────────────────────────────────
def load_token():
"""Load API token from .env file or FASTMAIL_API_TOKEN env var."""
token = os.environ.get("FASTMAIL_API_TOKEN")
if token:
return token
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
line = line.strip()
if line.startswith("FASTMAIL_API_TOKEN="):
return line.split("=", 1)[1].strip().strip('"').strip("'")
raise RuntimeError(
f"No API token found. Set FASTMAIL_API_TOKEN env var or create {ENV_FILE} "
"with FASTMAIL_API_TOKEN=your_token"
)
def get_session(token):
"""Fetch JMAP session, return (api_url, account_id)."""
resp = requests.get(
JMAP_SESSION_URL,
headers={"Authorization": f"Bearer {token}"},
timeout=15,
)
resp.raise_for_status()
session = resp.json()
api_url = session["apiUrl"]
account_id = session.get("primaryAccounts", {}).get(JMAP_MAIL_CAP)
if not account_id:
for acct_id, acct in session.get("accounts", {}).items():
if acct.get("accountCapabilities", {}).get(JMAP_MAIL_CAP):
account_id = acct_id
break
if not account_id:
raise RuntimeError("Could not find a JMAP mail account in session")
logging.info(f"JMAP session OK. Account: {account_id}")
return api_url, account_id
def jmap_call(api_url, token, method_calls):
"""Execute a JMAP multi-method call and return methodResponses."""
resp = requests.post(
api_url,
json={
"using": [JMAP_CORE_CAP, JMAP_MAIL_CAP],
"methodCalls": method_calls,
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
timeout=45,
)
resp.raise_for_status()
return resp.json().get("methodResponses", [])
# ─────────────────────────────────────────────
# Receipt detection & extraction helpers
# ─────────────────────────────────────────────
def is_likely_receipt(subject, from_addr, body_snippet):
"""Heuristic: is this email plausibly a receipt?"""
text = f"{subject} {from_addr} {body_snippet}".lower()
for kw in RECEIPT_KEYWORDS:
if kw in text:
return True
for domain in RECEIPT_DOMAINS:
if domain in text:
return True
# Dollar sign in subject is a strong signal
if re.search(r'\$\s*[\d,]+', subject or ""):
return True
return False
def extract_amounts(text):
"""Extract all dollar amounts from text. Returns list of clean numeric strings."""
if not text:
return []
patterns = [
r'\$\s*[\d,]+(?:\.\d{1,2})?', # $12.34 or $1,234.56
r'USD\s*[\d,]+(?:\.\d{1,2})?', # USD 12.34
r'(?:Total|Amount|Charged|Billed)[:\s]+\$?\s*[\d,]+(?:\.\d{1,2})?',
]
raw = []
for pat in patterns:
raw.extend(re.findall(pat, text, re.IGNORECASE))
seen, cleaned = set(), []
for amt in raw:
numeric = re.sub(r'[^\d.,]', '', amt)
if numeric and numeric not in seen:
seen.add(numeric)
cleaned.append(numeric)
return cleaned
def pick_primary_amount(amounts):
"""Heuristic: pick the most likely 'total' from extracted amounts."""
if not amounts:
return None
floats = []
for amt in amounts:
try:
floats.append(float(amt.replace(',', '')))
except ValueError:
pass
if not floats:
return None
# Return the largest (usually the order total, not a line item)
return f"${max(floats):.2f}"
def extract_vendor(from_addr, from_name):
"""Derive a clean vendor name from the sender info."""
if from_name and from_name.strip():
name = re.sub(
r'\s+(Inc\.?|LLC\.?|Ltd\.?|Corp\.?|Co\.?)$', '',
from_name, flags=re.IGNORECASE
).strip()
return name
if from_addr:
m = re.search(r'@([^@\s>]+)', from_addr)
if m:
domain = m.group(1)
domain = re.sub(r'\.(com|net|org|io|co\.uk|co)$', '', domain, flags=re.IGNORECASE)
domain = re.sub(
r'^(mail|email|noreply|no-reply|info|billing|payments|receipts|orders|notifications?)\.',
'', domain, flags=re.IGNORECASE
)
return domain.replace('-', ' ').replace('.', ' ').title()
return "Unknown"
# ─────────────────────────────────────────────
# Main fetch logic
# ─────────────────────────────────────────────
def fetch_candidate_emails(token, after_dt, before_dt, limit=500):
"""Query Fastmail JMAP for candidate receipt emails."""
api_url, account_id = get_session(token)
after_str = after_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
before_str = before_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
# Multiple search passes — cast wide net, deduplicate by ID
filters = [
{"after": after_str, "before": before_str, "text": "$"}, # any $ in email
{"after": after_str, "before": before_str, "subject": "receipt"},
{"after": after_str, "before": before_str, "subject": "order"},
{"after": after_str, "before": before_str, "subject": "invoice"},
{"after": after_str, "before": before_str, "subject": "payment"},
{"after": after_str, "before": before_str, "subject": "confirmation"},
{"after": after_str, "before": before_str, "subject": "charged"},
{"after": after_str, "before": before_str, "subject": "purchase"},
]
all_ids = set()
for i, filt in enumerate(filters):
responses = jmap_call(api_url, token, [[
"Email/query", {
"accountId": account_id,
"filter": filt,
"sort": [{"property": "receivedAt", "isAscending": False}],
"limit": limit,
}, f"q{i}"
]])
for name, result, _ in responses:
if name == "Email/query":
ids = result.get("ids", [])
logging.info(f"Filter {i} ('{list(filt.values())[-1]}'): {len(ids)} results")
all_ids.update(ids)
elif name == "error":
logging.warning(f"JMAP error on filter {i}: {result}")
logging.info(f"Total unique candidate emails: {len(all_ids)}")
if not all_ids:
return []
# Fetch details in batches of 50
id_list = list(all_ids)
all_emails = []
for start in range(0, len(id_list), 50):
batch = id_list[start:start + 50]
responses = jmap_call(api_url, token, [[
"Email/get", {
"accountId": account_id,
"ids": batch,
"properties": [
"id", "subject", "receivedAt", "from",
"preview", "textBody", "htmlBody", "bodyValues",
],
"fetchTextBodyValues": True,
"fetchHTMLBodyValues": True,
"maxBodyValueBytes": 20480,
}, "g1"
]])
for name, result, _ in responses:
if name == "Email/get":
all_emails.extend(result.get("list", []))
logging.info(f"Fetched details for {len(all_emails)} emails")
return all_emails
def process_emails(emails):
"""Turn raw JMAP email objects into structured receipt records."""
records = []
for email in emails:
subject = email.get("subject", "") or ""
recv_at = email.get("receivedAt", "")
from_list = email.get("from") or []
from_addr = from_list[0].get("email", "") if from_list else ""
from_name = from_list[0].get("name", "") if from_list else ""
# Assemble body text — prefer plain text, fall back to HTML-stripped
body_values = email.get("bodyValues") or {}
text_body = email.get("textBody") or []
html_body = email.get("htmlBody") or []
body_text = ""
for part in text_body:
pid = part.get("partId", "")
if pid in body_values:
body_text += body_values[pid].get("value", "")
if not body_text.strip():
# HTML-only email (common for Apple, many retailers)
for part in html_body:
pid = part.get("partId", "")
if pid in body_values:
raw_html = body_values[pid].get("value", "")
body_text += strip_html(raw_html)
preview = email.get("preview", "") or ""
body_snippet = (body_text or preview)[:15000]
if not is_likely_receipt(subject, from_addr, body_snippet):
continue
# Search both stripped text AND raw HTML for amounts (catches HTML-only emails
# like Apple where prices may be buried in large HTML after CSS blocks)
search_text = f"{subject} {body_snippet}"
raw_html_text = ""
for part in html_body:
pid = part.get("partId", "")
if pid in body_values:
raw_html_text += body_values[pid].get("value", "")
if raw_html_text:
search_text += " " + raw_html_text[:20000]
amounts = extract_amounts(search_text)
primary_amount = pick_primary_amount(amounts)
vendor = extract_vendor(from_addr, from_name)
try:
dt = datetime.fromisoformat(recv_at.replace("Z", "+00:00"))
date_str = dt.strftime("%Y-%m-%d")
time_str = dt.strftime("%H:%M UTC")
except (ValueError, AttributeError):
date_str = recv_at[:10] if recv_at else ""
time_str = ""
records.append({
"date": date_str,
"time": time_str,
"vendor": vendor,
"from_name": from_name,
"from_email": from_addr,
"subject": subject,
"amount": primary_amount,
"all_amounts": amounts,
"preview": preview[:250],
"email_id": email.get("id", ""),
})
records.sort(key=lambda r: r["date"], reverse=True)
return records
# ─────────────────────────────────────────────
# Output formatters
# ─────────────────────────────────────────────
def output_json(records):
print(json.dumps(records, indent=2, ensure_ascii=False))
def output_csv(records):
if not records:
print("No records found.")
return
fields = ["date", "vendor", "amount", "subject", "from_email", "all_amounts", "preview"]
writer = csv.DictWriter(sys.stdout, fieldnames=fields, extrasaction="ignore")
writer.writeheader()
for rec in records:
row = dict(rec)
row["all_amounts"] = "; ".join(row.get("all_amounts", []))
writer.writerow(row)
def output_summary(records):
if not records:
print("No receipt emails found in the requested time range.")
return
with_amounts = [r for r in records if r["amount"]]
print(f"\n{''*72}")
print(f" RECEIPT EMAILS — {len(records)} found ({len(with_amounts)} with extracted amounts)")
print(f"{''*72}")
print(f" {'DATE':<12} {'AMOUNT':<12} {'VENDOR':<28} SUBJECT")
print(f" {''*68}")
for rec in records:
amt = rec["amount"] or " ?"
vendor = (rec["vendor"] or "")[:26]
subj = (rec["subject"] or "")[:38]
print(f" {rec['date']:<12} {amt:<12} {vendor:<28} {subj}")
print(f"{''*72}\n")
# ─────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Extract receipt emails from Fastmail via JMAP — YNAB helper"
)
parser.add_argument("--days", type=int, default=30,
help="Days back to search (default: 30)")
parser.add_argument("--after", type=str,
help="Start date YYYY-MM-DD (overrides --days)")
parser.add_argument("--before", type=str,
help="End date YYYY-MM-DD (default: now)")
parser.add_argument("--limit", type=int, default=500,
help="Max emails per filter pass (default: 500)")
parser.add_argument("--format", choices=["json", "csv", "summary"],
default="summary",
help="Output format (default: summary)")
parser.add_argument("--verbose", action="store_true",
help="Show JMAP debug info on stderr")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.WARNING,
format="%(levelname)s: %(message)s",
stream=sys.stderr,
)
now = datetime.now(timezone.utc)
before_dt = datetime.fromisoformat(args.before).replace(tzinfo=timezone.utc) \
if args.before else now
after_dt = datetime.fromisoformat(args.after).replace(tzinfo=timezone.utc) \
if args.after else now - timedelta(days=args.days)
try:
token = load_token()
except RuntimeError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
emails = fetch_candidate_emails(token, after_dt, before_dt, limit=args.limit)
records = process_emails(emails)
if args.format == "json": output_json(records)
elif args.format == "csv": output_csv(records)
else: output_summary(records)
if __name__ == "__main__":
main()