Listening for events…

TerraPulse Data Pipelines

A reference for the ingestion, processing, and analysis pipelines that move data from external sources into research workspaces.

Last updated: 2026-04-06

Overview

TerraPulse runs three classes of pipeline:

  1. Ingestion — scheduled fetchers pull data from external APIs into PostgreSQL
  2. Backfill — bulk historical data downloads (queue + workers)
  3. Analysis — workspace scripts that produce papers and visualizations

All pipelines share these properties:

  • File-by-file processing where possible (memory bounded)
  • DuckDB or Parquet for staging large datasets
  • PostgreSQL as the source of truth for normalized observations
  • Hourly cache refresh (graph, timeline, stats) via APScheduler

Pipeline 1: Real-Time Ingestion

Architecture:

APScheduler → Fetcher → DuckDB stage → Normalizer → PostgreSQL

Components:

  • src/terrapulse/ingestion/scheduler.py — APScheduler running in the FastAPI process
  • src/terrapulse/ingestion/fetchers/*.py — one class per data source
  • src/terrapulse/ingestion/orchestrator.py — runs fetchers, dispatches normalizers
  • src/terrapulse/ingestion/normalizer.py — converts raw rows to canonical observation schema

Active fetchers (47+):

Schedule Fetchers
60s USGS earthquakes, NWS alerts
5 min Open-Meteo AQI, Safecast, NOAA space weather, GOES X-ray, DSCOVR solar wind, EMSC seismic
1 hr Open-Meteo weather, NEO catalog, GFZ seismic, INTERMAGNET, Dst index, NMDB cosmic rays, AMS fireballs, GMN meteors, Fink ZTF, USGS volcanoes, NWS alerts, WSPR corridors, HamQSL propagation
6 hr CelesTrak satellites, Launch Library
Daily JPL SBDB (Apophis watchlist), NOAA CO2, SILSO sunspots, Smithsonian volcanoes, GWOSC events, magnetic poles, Auger UHECR

Adding a new source:

  1. Write a fetcher class inheriting BaseFetcher in src/terrapulse/ingestion/fetchers/
  2. Implement async def fetch(self) -> list[dict] returning raw rows
  3. Add a normalizer function in src/terrapulse/ingestion/normalizer.py
  4. Register both in src/terrapulse/ingestion/orchestrator.py
  5. Add the schedule entry in src/terrapulse/ingestion/scheduler.py
  6. Add the seed entry in src/terrapulse/db/seed.py

The fetcher base class handles HTTP retries, timeouts, and auth from auth_config rows in the database.


Pipeline 2: Backfill Queue

Architecture:

Job seeder → DuckDB queue → FastAPI on :8222 → Async workers → Parquet files on /mnt/ursa

Components:

  • scripts/backfill/api.py — FastAPI server, single DuckDB owner
  • scripts/backfill/worker.py — async coroutines that claim jobs and download
  • scripts/backfill/seeders.py — job generators (FDSN, ClickHouse, CSV, HAPI patterns)

Queue states: pending → running → done/failed

Why a queue:

  • DuckDB is single-writer; without an API in front of it, you can't read status while a worker holds the connection
  • Workers can fail and retry without losing job state
  • Re-queue failed attempts via gh issue style POST endpoints

Current state: 4,933/4,935 jobs done, 5.4M rows, 188 MB ingested

Job patterns supported:

  • FDSN seismic (USGS, EMSC, GFZ, ISC) — date-windowed FDSN-WS queries
  • ClickHouse aggregate (WSPR corridors) — server-side SQL
  • CSV download (Dst index, sunspots) — direct HTTP to file
  • HAPI (Heliophysics API) — CDAWeb, OMNI
  • Onboard (one-shot bulk download) — used for World Bank, etc.

Pipeline 3: WSPR Raw Backfill (special case)

The challenge: 11.5 billion WSPR spots from db1.wspr.live ClickHouse, ~213 GB Parquet, 21 years (Nov 2004 - Apr 2026), 258 monthly files.

Initial naive approach: One HTTP request per month. Failed because ClickHouse drops large connections after ~30 seconds.

Working approach (scripts/wspr_raw_backfill.py):

  1. Try whole-month download first
  2. If response truncates (no PAR1 Parquet footer): fall back to day-by-day
  3. Download each day separately (~100 MB per file)
  4. Validate PAR1 footer after every download
  5. Merge daily Parquets into monthly file with Polars
  6. Delete day directory on success
  7. 30-second pause between months, 5-second pause between days

Storage: /mnt/ursa/data/terrapulse/wspr/raw/wspr_raw_YYYYMM.parquet

Lessons learned:

  • Always validate the file footer after a streaming download
  • Day-by-day chunking sidesteps server connection limits
  • A 30-minute timeout is the right setting for big files (default 10 min was too short)

Pipeline 4: Workspace Analysis Pattern

Every research workspace follows the same shape:

workspaces/{slug}/
├── workspace.json          # metadata: title, status, issue, findings_summary
├── index.md                # research narrative
├── scripts/
│   ├── extract.py          # PostgreSQL/Parquet → workspace data/
│   └── analyze.py          # statistical analysis → results.json + figures
├── data/
│   ├── *.parquet           # extracted intermediate data
│   └── results.json        # machine-readable findings
├── www/
│   ├── *.html              # interactive Plotly visualizations
│   └── *.png               # static exports for paper figures
└── paper/
    └── paper.tex           # LaTeX paper (revtex4-2)

Standard analyze.py structure:

  1. Load extracted data
  2. Run statistical tests (Pearson, Spearman, Mann-Whitney, Granger, etc.)
  3. Compute effect sizes (Cohen's d, rank-biserial, etc.)
  4. Apply Bonferroni correction for multiple comparisons
  5. Run a permutation test (1000+ shuffles) for primary findings
  6. Save results to data/results.json

Quality bar (enforced by the editor agent Mike):

  • Sample sizes reported for every test
  • Effect sizes alongside p-values
  • Bonferroni correction documented
  • At least one null hypothesis test
  • Sensitivity analysis on key thresholds
  • Honest disclosure of limitations

Pipeline 5: Memory-Bounded Big Data Processing

The lesson from the WSPR census: Polars read_parquet and scan_parquet().collect() both leak memory across iterations when processing many large files in a single Python process.

Failed approaches:

  • v1: Multiple .collect() calls per file (~15 per file × 258 files) — 52 GB RAM, OOM
  • v2: Single read_parquet per file with eager Polars — 52 GB RAM, OOM

Working approach (subprocess per file):

# analyze_v3.py — orchestrator
for fpath in files:
    result = subprocess.run(
        ['python', 'agg_one.py', str(fpath)],
        capture_output=True, text=True, timeout=300
    )
    d = json.loads(result.stdout)
    accumulate(d)
# agg_one.py — single-file worker
df = pl.read_parquet(fpath)
result = {
    'n': len(df),
    'band_counts': {...},
    # ... aggregate everything in one pass
}
print(json.dumps(result))
sys.exit(0)  # subprocess dies, OS reclaims memory

Why this works: OS memory cleanup is bulletproof when a process exits. Python+Polars within a single process accumulates state in ways that are hard to flush. The IPC overhead of subprocess.run() is negligible compared to the file I/O.

When to use:

  • Aggregating across 100+ large Parquet files
  • Working set per file < 1 GB but cumulative would exceed RAM
  • Aggregates fit in JSON (top-N counts, sums, histograms)

Pipeline 6: Cache Refresh (Hourly Crons)

The web application reads from pre-computed JSON caches instead of querying PostgreSQL on every page load. This was added after the homepage stalled at 4-5 seconds doing COUNT(*) on 40M rows.

APScheduler jobs (in src/terrapulse/ingestion/scheduler.py):

Job Interval What it does
stats_cache_refresh hourly Total observations, sources, metrics → data/stats_cache.json
sources_cache_refresh hourly Per-source counts and metadata → data/sources_cache.json
graph_cache_refresh hourly Knowledge graph rebuild → data/graph_cache.json
timeline_refresh hourly Daily timeline (8,000+ days × 23 metrics) → data/timeline.json + .parquet

Read pattern (Astro web):

// Read embedded JSON at SSR time, never call API
const stats = JSON.parse(readFileSync('../data/stats_cache.json'))

Result: Homepage loads in 50-80ms instead of 2-5 seconds.


Pipeline 7: Knowledge Graph Generation

Inputs:

  • All datasources rows in PostgreSQL
  • All workspaces/*/workspace.json files
  • All workspaces/*/data/results.json files (statistical findings)
  • All workspaces/*/index.md files (cross-workspace citations via /lab/ links)

Output: data/graph_cache.json with nodes and edges

Node types:

  • datasource — external data source (color: blue)
  • metric — observation metric (color: orange)
  • workspace — research workspace (color: purple)
  • finding — statistical result with verdict (color: green/red)

Edge types:

  • produces — datasource → metric
  • uses — workspace → metric
  • cites — workspace → workspace (from index.md links)
  • tested — workspace → finding (with effect size)

Generation script: scripts/regenerate_graph.py

Web rendering: D3 force-directed simulation in web/src/pages/garden.astro. Data is embedded at SSR time (not fetched), D3 is self-hosted (no CDN dependency).

Current state: 910 nodes, 1,402 edges, 50 workspaces, 416 datasources, 176 metrics, 145 findings.


Pipeline 8: Paper Production Assembly Line

The most novel pipeline. Three specialized agents in sequence:

Elise (PMA)  →  Mike (Editor)  →  Elise revision  →  Mike R2  →  Dana (Copy editor)
 writes paper    reviews rigor      addresses issues   accepts   polishes prose

Elise (/paper <issue_number>):

  • Reads a GitHub issue with a research idea
  • Maps it to available metrics in PostgreSQL
  • Writes extract.py and analyze.py
  • Runs the analysis
  • Writes index.md and paper/paper.tex
  • Commits, pushes, posts results comment on the issue

Mike (/review <issue_number>):

  • Reads the paper, code, and results.json
  • Posts a structured editorial review comment
  • Categorizes issues: CRITICAL, IMPORTANT, MINOR
  • Verdict: ACCEPT / MINOR REVISIONS / REVISE
  • Does NOT modify any files

Dana (/copyedit <slug>):

  • 20 hard rules: no em-dashes, active voice, no weasel words, Oxford commas, etc.
  • LaTeX-specific: math mode, units, dashes, refs
  • Tone: no hedging chains, no jargon, no editorial commentary
  • DOES modify files directly, recompiles PDF

Throughput: 17 papers accepted across 3 sessions, all copy-edited. Average pipeline time per paper: ~30-60 minutes.

Anti-sycophancy protocol: All three agents are trained to resist confirmation bias. Mike especially flags any "interesting" finding without backing data. Elise reframes findings honestly when revisions reveal selection effects.


Pipeline 9: Article Publishing

User-facing articles aggregating multiple research workspaces into journalism-style content.

Pattern:

  1. Astro page in web/src/pages/articles/{slug}.astro
  2. Imports workspace data from filesystem at SSR time
  3. Embeds inline figures from workspace www/ via API routes
  4. Custom OG image generated with matplotlib (1200×630) in web/public/articles/
  5. OpenGraph + Twitter Card meta tags via Base.astro props
  6. Featured on homepage with gradient cards

Examples:

  • articles/sky-anomalies — aggregates 7 papers on UFO/fireball/satellite topics
  • articles/solar-watch-april-2026 — real-time space weather event coverage

The funnel: Twitter post → article → lab workspace → individual paper PDF


Service Topology

Internet
  │
  └── nginx (terrapulse.info)
        │
        ├── /api/v1/* → FastAPI (127.0.0.1:8111)
        │   └── PostgreSQL (5433) + DuckDB staging
        │
        ├── /* → Astro SSR (127.0.0.1:4321)
        │   └── Filesystem reads (workspaces, data caches)
        │
        └── /api/v1/ws → WebSocket (8111)
              └── Live event mesh

  Backfill queue API: 127.0.0.1:8222 (internal only)

  APScheduler runs in the FastAPI process
  Cron-style jobs for fetchers + cache refresh

Restart command: sudo systemctl restart terrapulse terrapulse-web

Logs:

  • journalctl -u terrapulse — fetcher activity, cache refresh
  • journalctl -u terrapulse-web — Astro SSR
  • /var/log/nginx/terrapulse.info.access.log — HTTP access (real IPs via $http_cf_connecting_ip)
  • logs/client.jsonl — browser-side errors and performance

File Layout Summary

/home/bisenbek/projects/terrapulse/
├── src/terrapulse/         # Python application code
│   ├── api/                # FastAPI routes
│   ├── db/                 # SQLAlchemy models, seed data
│   ├── ingestion/          # Fetchers, normalizers, scheduler
│   └── lab/                # Workspace + knowledge graph helpers
│
├── scripts/                # Standalone utilities
│   ├── backfill/           # Queue API, workers, seeders
│   ├── wspr_*.py           # WSPR-specific pipelines
│   ├── regenerate_graph.py # Knowledge graph rebuild
│   └── build_timeline.py   # Daily timeline cache
│
├── workspaces/             # Research artifacts (one dir per study)
│   └── {slug}/
│       ├── workspace.json
│       ├── index.md
│       ├── scripts/
│       ├── data/
│       ├── www/
│       └── paper/
│
├── data/                   # Hourly-refreshed caches + index files
│   ├── stats_cache.json
│   ├── sources_cache.json
│   ├── graph_cache.json
│   ├── timeline.json
│   └── timeline.parquet
│
├── web/                    # Astro SSR frontend
│   ├── src/pages/          # Routes
│   ├── src/components/     # Reusable UI
│   ├── src/layouts/        # Base layouts
│   └── public/             # Static assets
│
└── /mnt/ursa/data/terrapulse/  # Bulk storage (not in git)
    ├── wspr/raw/           # 213 GB of raw WSPR Parquet (258 months)
    ├── wspr/corridors/     # Hourly aggregated corridors
    └── ...                 # Other backfilled archives
Live Feed