TerraPulse Data Pipelines
A reference for the ingestion, processing, and analysis pipelines that move data from external sources into research workspaces.
Last updated: 2026-04-06
Overview
TerraPulse runs three classes of pipeline:
- Ingestion — scheduled fetchers pull data from external APIs into PostgreSQL
- Backfill — bulk historical data downloads (queue + workers)
- Analysis — workspace scripts that produce papers and visualizations
All pipelines share these properties:
- File-by-file processing where possible (memory bounded)
- DuckDB or Parquet for staging large datasets
- PostgreSQL as the source of truth for normalized observations
- Hourly cache refresh (graph, timeline, stats) via APScheduler
Pipeline 1: Real-Time Ingestion
Architecture:
APScheduler → Fetcher → DuckDB stage → Normalizer → PostgreSQL
Components:
src/terrapulse/ingestion/scheduler.py— APScheduler running in the FastAPI processsrc/terrapulse/ingestion/fetchers/*.py— one class per data sourcesrc/terrapulse/ingestion/orchestrator.py— runs fetchers, dispatches normalizerssrc/terrapulse/ingestion/normalizer.py— converts raw rows to canonical observation schema
Active fetchers (47+):
| Schedule | Fetchers |
|---|---|
| 60s | USGS earthquakes, NWS alerts |
| 5 min | Open-Meteo AQI, Safecast, NOAA space weather, GOES X-ray, DSCOVR solar wind, EMSC seismic |
| 1 hr | Open-Meteo weather, NEO catalog, GFZ seismic, INTERMAGNET, Dst index, NMDB cosmic rays, AMS fireballs, GMN meteors, Fink ZTF, USGS volcanoes, NWS alerts, WSPR corridors, HamQSL propagation |
| 6 hr | CelesTrak satellites, Launch Library |
| Daily | JPL SBDB (Apophis watchlist), NOAA CO2, SILSO sunspots, Smithsonian volcanoes, GWOSC events, magnetic poles, Auger UHECR |
Adding a new source:
- Write a fetcher class inheriting
BaseFetcherinsrc/terrapulse/ingestion/fetchers/ - Implement
async def fetch(self) -> list[dict]returning raw rows - Add a normalizer function in
src/terrapulse/ingestion/normalizer.py - Register both in
src/terrapulse/ingestion/orchestrator.py - Add the schedule entry in
src/terrapulse/ingestion/scheduler.py - Add the seed entry in
src/terrapulse/db/seed.py
The fetcher base class handles HTTP retries, timeouts, and auth from auth_config rows in the database.
Pipeline 2: Backfill Queue
Architecture:
Job seeder → DuckDB queue → FastAPI on :8222 → Async workers → Parquet files on /mnt/ursa
Components:
scripts/backfill/api.py— FastAPI server, single DuckDB ownerscripts/backfill/worker.py— async coroutines that claim jobs and downloadscripts/backfill/seeders.py— job generators (FDSN, ClickHouse, CSV, HAPI patterns)
Queue states: pending → running → done/failed
Why a queue:
- DuckDB is single-writer; without an API in front of it, you can't read status while a worker holds the connection
- Workers can fail and retry without losing job state
- Re-queue failed attempts via
gh issuestyle POST endpoints
Current state: 4,933/4,935 jobs done, 5.4M rows, 188 MB ingested
Job patterns supported:
- FDSN seismic (USGS, EMSC, GFZ, ISC) — date-windowed FDSN-WS queries
- ClickHouse aggregate (WSPR corridors) — server-side SQL
- CSV download (Dst index, sunspots) — direct HTTP to file
- HAPI (Heliophysics API) — CDAWeb, OMNI
- Onboard (one-shot bulk download) — used for World Bank, etc.
Pipeline 3: WSPR Raw Backfill (special case)
The challenge: 11.5 billion WSPR spots from db1.wspr.live ClickHouse, ~213 GB Parquet, 21 years (Nov 2004 - Apr 2026), 258 monthly files.
Initial naive approach: One HTTP request per month. Failed because ClickHouse drops large connections after ~30 seconds.
Working approach (scripts/wspr_raw_backfill.py):
- Try whole-month download first
- If response truncates (no
PAR1Parquet footer): fall back to day-by-day - Download each day separately (~100 MB per file)
- Validate
PAR1footer after every download - Merge daily Parquets into monthly file with Polars
- Delete day directory on success
- 30-second pause between months, 5-second pause between days
Storage: /mnt/ursa/data/terrapulse/wspr/raw/wspr_raw_YYYYMM.parquet
Lessons learned:
- Always validate the file footer after a streaming download
- Day-by-day chunking sidesteps server connection limits
- A 30-minute timeout is the right setting for big files (default 10 min was too short)
Pipeline 4: Workspace Analysis Pattern
Every research workspace follows the same shape:
workspaces/{slug}/
├── workspace.json # metadata: title, status, issue, findings_summary
├── index.md # research narrative
├── scripts/
│ ├── extract.py # PostgreSQL/Parquet → workspace data/
│ └── analyze.py # statistical analysis → results.json + figures
├── data/
│ ├── *.parquet # extracted intermediate data
│ └── results.json # machine-readable findings
├── www/
│ ├── *.html # interactive Plotly visualizations
│ └── *.png # static exports for paper figures
└── paper/
└── paper.tex # LaTeX paper (revtex4-2)
Standard analyze.py structure:
- Load extracted data
- Run statistical tests (Pearson, Spearman, Mann-Whitney, Granger, etc.)
- Compute effect sizes (Cohen's d, rank-biserial, etc.)
- Apply Bonferroni correction for multiple comparisons
- Run a permutation test (1000+ shuffles) for primary findings
- Save results to
data/results.json
Quality bar (enforced by the editor agent Mike):
- Sample sizes reported for every test
- Effect sizes alongside p-values
- Bonferroni correction documented
- At least one null hypothesis test
- Sensitivity analysis on key thresholds
- Honest disclosure of limitations
Pipeline 5: Memory-Bounded Big Data Processing
The lesson from the WSPR census: Polars read_parquet and scan_parquet().collect() both leak memory across iterations when processing many large files in a single Python process.
Failed approaches:
- v1: Multiple
.collect()calls per file (~15 per file × 258 files) — 52 GB RAM, OOM - v2: Single
read_parquetper file with eager Polars — 52 GB RAM, OOM
Working approach (subprocess per file):
# analyze_v3.py — orchestrator
for fpath in files:
result = subprocess.run(
['python', 'agg_one.py', str(fpath)],
capture_output=True, text=True, timeout=300
)
d = json.loads(result.stdout)
accumulate(d)
# agg_one.py — single-file worker
df = pl.read_parquet(fpath)
result = {
'n': len(df),
'band_counts': {...},
# ... aggregate everything in one pass
}
print(json.dumps(result))
sys.exit(0) # subprocess dies, OS reclaims memory
Why this works: OS memory cleanup is bulletproof when a process exits. Python+Polars within a single process accumulates state in ways that are hard to flush. The IPC overhead of subprocess.run() is negligible compared to the file I/O.
When to use:
- Aggregating across 100+ large Parquet files
- Working set per file < 1 GB but cumulative would exceed RAM
- Aggregates fit in JSON (top-N counts, sums, histograms)
Pipeline 6: Cache Refresh (Hourly Crons)
The web application reads from pre-computed JSON caches instead of querying PostgreSQL on every page load. This was added after the homepage stalled at 4-5 seconds doing COUNT(*) on 40M rows.
APScheduler jobs (in src/terrapulse/ingestion/scheduler.py):
| Job | Interval | What it does |
|---|---|---|
stats_cache_refresh |
hourly | Total observations, sources, metrics → data/stats_cache.json |
sources_cache_refresh |
hourly | Per-source counts and metadata → data/sources_cache.json |
graph_cache_refresh |
hourly | Knowledge graph rebuild → data/graph_cache.json |
timeline_refresh |
hourly | Daily timeline (8,000+ days × 23 metrics) → data/timeline.json + .parquet |
Read pattern (Astro web):
// Read embedded JSON at SSR time, never call API
const stats = JSON.parse(readFileSync('../data/stats_cache.json'))
Result: Homepage loads in 50-80ms instead of 2-5 seconds.
Pipeline 7: Knowledge Graph Generation
Inputs:
- All
datasourcesrows in PostgreSQL - All
workspaces/*/workspace.jsonfiles - All
workspaces/*/data/results.jsonfiles (statistical findings) - All
workspaces/*/index.mdfiles (cross-workspace citations via/lab/links)
Output: data/graph_cache.json with nodes and edges
Node types:
datasource— external data source (color: blue)metric— observation metric (color: orange)workspace— research workspace (color: purple)finding— statistical result with verdict (color: green/red)
Edge types:
produces— datasource → metricuses— workspace → metriccites— workspace → workspace (from index.md links)tested— workspace → finding (with effect size)
Generation script: scripts/regenerate_graph.py
Web rendering: D3 force-directed simulation in web/src/pages/garden.astro. Data is embedded at SSR time (not fetched), D3 is self-hosted (no CDN dependency).
Current state: 910 nodes, 1,402 edges, 50 workspaces, 416 datasources, 176 metrics, 145 findings.
Pipeline 8: Paper Production Assembly Line
The most novel pipeline. Three specialized agents in sequence:
Elise (PMA) → Mike (Editor) → Elise revision → Mike R2 → Dana (Copy editor)
writes paper reviews rigor addresses issues accepts polishes prose
Elise (/paper <issue_number>):
- Reads a GitHub issue with a research idea
- Maps it to available metrics in PostgreSQL
- Writes
extract.pyandanalyze.py - Runs the analysis
- Writes
index.mdandpaper/paper.tex - Commits, pushes, posts results comment on the issue
Mike (/review <issue_number>):
- Reads the paper, code, and results.json
- Posts a structured editorial review comment
- Categorizes issues: CRITICAL, IMPORTANT, MINOR
- Verdict: ACCEPT / MINOR REVISIONS / REVISE
- Does NOT modify any files
Dana (/copyedit <slug>):
- 20 hard rules: no em-dashes, active voice, no weasel words, Oxford commas, etc.
- LaTeX-specific: math mode, units, dashes, refs
- Tone: no hedging chains, no jargon, no editorial commentary
- DOES modify files directly, recompiles PDF
Throughput: 17 papers accepted across 3 sessions, all copy-edited. Average pipeline time per paper: ~30-60 minutes.
Anti-sycophancy protocol: All three agents are trained to resist confirmation bias. Mike especially flags any "interesting" finding without backing data. Elise reframes findings honestly when revisions reveal selection effects.
Pipeline 9: Article Publishing
User-facing articles aggregating multiple research workspaces into journalism-style content.
Pattern:
- Astro page in
web/src/pages/articles/{slug}.astro - Imports workspace data from filesystem at SSR time
- Embeds inline figures from workspace
www/via API routes - Custom OG image generated with matplotlib (1200×630) in
web/public/articles/ - OpenGraph + Twitter Card meta tags via
Base.astroprops - Featured on homepage with gradient cards
Examples:
articles/sky-anomalies— aggregates 7 papers on UFO/fireball/satellite topicsarticles/solar-watch-april-2026— real-time space weather event coverage
The funnel: Twitter post → article → lab workspace → individual paper PDF
Service Topology
Internet
│
└── nginx (terrapulse.info)
│
├── /api/v1/* → FastAPI (127.0.0.1:8111)
│ └── PostgreSQL (5433) + DuckDB staging
│
├── /* → Astro SSR (127.0.0.1:4321)
│ └── Filesystem reads (workspaces, data caches)
│
└── /api/v1/ws → WebSocket (8111)
└── Live event mesh
Backfill queue API: 127.0.0.1:8222 (internal only)
APScheduler runs in the FastAPI process
Cron-style jobs for fetchers + cache refresh
Restart command: sudo systemctl restart terrapulse terrapulse-web
Logs:
journalctl -u terrapulse— fetcher activity, cache refreshjournalctl -u terrapulse-web— Astro SSR/var/log/nginx/terrapulse.info.access.log— HTTP access (real IPs via$http_cf_connecting_ip)logs/client.jsonl— browser-side errors and performance
File Layout Summary
/home/bisenbek/projects/terrapulse/
├── src/terrapulse/ # Python application code
│ ├── api/ # FastAPI routes
│ ├── db/ # SQLAlchemy models, seed data
│ ├── ingestion/ # Fetchers, normalizers, scheduler
│ └── lab/ # Workspace + knowledge graph helpers
│
├── scripts/ # Standalone utilities
│ ├── backfill/ # Queue API, workers, seeders
│ ├── wspr_*.py # WSPR-specific pipelines
│ ├── regenerate_graph.py # Knowledge graph rebuild
│ └── build_timeline.py # Daily timeline cache
│
├── workspaces/ # Research artifacts (one dir per study)
│ └── {slug}/
│ ├── workspace.json
│ ├── index.md
│ ├── scripts/
│ ├── data/
│ ├── www/
│ └── paper/
│
├── data/ # Hourly-refreshed caches + index files
│ ├── stats_cache.json
│ ├── sources_cache.json
│ ├── graph_cache.json
│ ├── timeline.json
│ └── timeline.parquet
│
├── web/ # Astro SSR frontend
│ ├── src/pages/ # Routes
│ ├── src/components/ # Reusable UI
│ ├── src/layouts/ # Base layouts
│ └── public/ # Static assets
│
└── /mnt/ursa/data/terrapulse/ # Bulk storage (not in git)
├── wspr/raw/ # 213 GB of raw WSPR Parquet (258 months)
├── wspr/corridors/ # Hourly aggregated corridors
└── ... # Other backfilled archives