Listening for events…

TerraPulse — Project Plan

Domain: terrapulse.info Stack: FastAPI · DuckDB · PostgreSQL/PostGIS · Python 3.12+ Architecture: See docs/terrapulse-pipeline.mermaid Conventions: See CLAUDE.md


Phase 0: Project Scaffolding

Goal: Runnable FastAPI app with databases connected and one health endpoint.

0.1 — Repository setup

  • Initialize git repo
  • Create pyproject.toml with project metadata and dependencies:
    • fastapi, uvicorn, httpx, pydantic, pydantic-settings
    • sqlalchemy[asyncio], asyncpg, geoalchemy2, alembic
    • duckdb
    • pytest, pytest-asyncio, respx
  • Create .env.example with: DATABASE_URL, DUCKDB_DIR, LOG_LEVEL
  • Create src/terrapulse/__init__.py and src/terrapulse/config.py (pydantic-settings Settings class)
  • Create .gitignore (include data/duckdb/*.duckdb, .env, __pycache__, .venv)

0.2 — Docker environment

  • Write Dockerfile (python:3.12-slim, install deps, run uvicorn)
  • Write docker-compose.yml with services:
    • api: the FastAPI app (port 8000)
    • db: postgres:16 with PostGIS (port 5432)
    • db-test: separate postgres instance for tests (port 5433)
  • Volume mount for data/duckdb/ directory

0.3 — Database connections

  • src/terrapulse/db/postgres.py: async SQLAlchemy engine + session factory
  • src/terrapulse/db/duckdb_manager.py: helper class to open/read/write per-source .duckdb files
  • Alembic init with alembic.ini pointing to DATABASE_URL from env

0.4 — FastAPI skeleton

  • src/terrapulse/main.py: create FastAPI app, include routers, lifespan for db init/shutdown
  • src/terrapulse/api/router.py: mount all route modules under /api/v1
  • src/terrapulse/api/routes/health.py: GET /api/v1/health returns {"status": "ok", "db": true/false, "version": "0.1.0"}

0.5 — Verify

  • docker-compose up starts all services without errors
  • curl localhost:8000/api/v1/health returns 200 with db: true
  • pytest discovers and runs (even if no real tests yet)

Phase 1: Data Models + CRUD

Goal: All core database tables exist with migrations, and CRUD endpoints work for datasources and tags.

1.1 — SQLAlchemy models

  • src/terrapulse/db/models/datasource.py:
    • Datasource table: source_id (uuid pk), name, base_url, endpoint_path, auth_type (enum: none/api_key/oauth), schedule_cron, is_active, created_at, updated_at
    • FieldMap table: id, source_id (fk), raw_field_name, normalized_name, data_type, unit, transform_fn (nullable)
    • SourceDoc table: id, source_id (fk), licence, provider, notes_md, last_fetched_at, fetch_status (enum: idle/fetching/success/error)
  • src/terrapulse/db/models/tag.py:
    • Tag table: tag_id (int pk), parent_tag_id (self-referential fk, nullable), name, slug (unique), depth
    • AutoTagRule table: rule_id, source_id (fk), match_field, match_pattern, assign_tag_id (fk)
  • src/terrapulse/db/models/observation.py:
    • Observation table: id (bigint pk), source_id (fk), lat, lng, geom (geography point, auto-computed from lat/lng), timestamp_utc, value, unit, quality_flag, duckdb_source_ref, duckdb_row_id, created_at
    • observation_tags join table: observation_id, tag_id
  • src/terrapulse/db/models/geo.py:
    • GeoRegion table: id, name, slug, level (enum: continent/country/state/city), parent_id (self-referential fk), geom (geography polygon, optional)

1.2 — Alembic migrations

  • Generate initial migration from models
  • Run migration, verify tables exist in postgres
  • Add PostGIS extension in migration: CREATE EXTENSION IF NOT EXISTS postgis
  • Add spatial index on observations.geom

1.3 — Pydantic schemas

  • src/terrapulse/api/schemas/datasource.py: DatasourceCreate, DatasourceUpdate, DatasourceResponse, FieldMapCreate, FieldMapResponse
  • src/terrapulse/api/schemas/tag.py: TagCreate, TagUpdate, TagResponse (include children list for tree serialization), AutoTagRuleCreate, AutoTagRuleResponse

1.4 — CRUD routes: Datasources

  • POST /api/v1/datasources — create a new datasource
  • GET /api/v1/datasources — list all (with ?is_active=true filter)
  • GET /api/v1/datasources/{source_id} — detail view (include field_maps and source_doc)
  • PUT /api/v1/datasources/{source_id} — update
  • DELETE /api/v1/datasources/{source_id} — soft delete (set is_active=false)
  • POST /api/v1/datasources/{source_id}/field-maps — add field mapping
  • GET /api/v1/datasources/{source_id}/field-maps — list field maps for source

1.5 — CRUD routes: Tags (hierarchical)

  • POST /api/v1/tags — create tag (with optional parent_tag_id)
  • GET /api/v1/tags — return full tree (recursive serialization)
  • GET /api/v1/tags/{tag_id} — single tag with children
  • PUT /api/v1/tags/{tag_id} — update (including reparenting)
  • DELETE /api/v1/tags/{tag_id} — delete (fail if has children, or cascade option)
  • POST /api/v1/tags/auto-rules — create auto-tag rule
  • GET /api/v1/tags/auto-rules — list all rules (filterable by source_id)

1.6 — Seed data

  • Create seed script (scripts/seed.py) that inserts:
    • The 6 MVP datasources with their field maps
    • Initial tag tree (environment → air_quality, earthquake, wildfire, water, climate, radiation)
    • Auto-tag rules mapping each source to its tag branch

1.7 — Tests

  • Test all CRUD endpoints (create, read, update, delete)
  • Test tag tree serialization (parent-child nesting)
  • Test field map association with datasources
  • Test auto-tag rule creation and validation

Phase 2: Ingestion Pipeline

Goal: Fetchers pull real data, stage it in DuckDB, normalize it, and write to Postgres.

2.1 — Base fetcher

  • src/terrapulse/ingestion/fetchers/base.py:
    • Abstract class: async def fetch(self) -> list[dict]
    • Built-in: httpx client with timeout (30s), retries (3x, exponential backoff), logging
    • Each fetcher receives its Datasource record (URL, field maps, etc.)

2.2 — DuckDB staging

  • Enhance duckdb_manager.py:
    • stage_raw(source_ref: str, rows: list[dict]) -> list[int] — append rows to the source's .duckdb file, return assigned row_ids
    • read_rows(source_ref: str, row_ids: list[int]) -> list[dict] — retrieve raw rows by id for enrichment
    • Auto-create table on first write (infer schema from first batch)
    • Add _fetched_at_utc and _batch_id columns automatically

2.3 — Implement MVP fetchers

  • usgs_earthquake.py: hit GeoJSON summary feed, parse features array
  • open_meteo.py: hit weather + air quality + UV endpoints, merge responses
  • usgs_water.py: hit instantaneous values endpoint, parse time series
  • safecast.py: hit measurements endpoint, paginate if needed
  • global_forest_watch.py: hit fire alerts endpoint
  • world_bank.py: hit country indicators endpoint

2.4 — Tagger

  • src/terrapulse/ingestion/tagger.py:
    • Load auto_tag_rules for the source
    • For each raw row, evaluate rules (field name + regex match on value)
    • Return list of tag_ids to attach
    • Default to the source's root tag if no rules match

2.5 — Normalizer

  • src/terrapulse/ingestion/normalizer.py:
    • Load field_maps for the source
    • Rename columns: raw_field_namenormalized_name
    • Cast data types per field map
    • Convert units (e.g., Fahrenheit → Celsius if needed)
    • Resolve lat/lng to geo_region_id via reverse lookup
    • Output: list of dicts matching the Observation schema
    • Include duckdb_source_ref and duckdb_row_id in each output row

2.6 — Orchestrator

  • src/terrapulse/ingestion/orchestrator.py:
    • Query all datasources where is_active=true
    • For each: instantiate fetcher → fetch → tag → stage in duckdb → normalize → insert into postgres
    • Run fetchers concurrently with asyncio.gather() (with semaphore to limit concurrency)
    • Log per-source: status, row count, duration, errors

2.7 — Scheduler

  • src/terrapulse/ingestion/scheduler.py:
    • On app startup, register the orchestrator to run on a 60-second interval
    • Use APScheduler's AsyncIOScheduler or a simple asyncio.create_task loop
    • Respect each source's schedule_cron field (some sources may run every 5 min, others hourly)

2.8 — Tests

  • Mock each fetcher's HTTP response, verify raw output shape
  • Test DuckDB staging: write rows, read back, verify row_ids
  • Test tagger: given rules and raw data, verify correct tag assignments
  • Test normalizer: given field maps and raw data, verify output schema
  • Test orchestrator end-to-end with mocked fetchers and test databases

Phase 3: Observation API + Spatial Queries

Goal: Expose normalized data through REST and GeoJSON endpoints with filtering.

3.1 — Observation schemas

  • ObservationResponse: all fields from the model + resolved tag names + source name
  • ObservationGeoJSON: standard GeoJSON Feature with observation data in properties
  • ObservationFilters: query params model — source_id, tag_id, bbox (comma-separated), time_start, time_end, limit, offset

3.2 — Observation routes

  • GET /api/v1/observations — paginated list with filters
  • GET /api/v1/observations/{id} — single observation with full detail
  • GET /api/v1/observations/geojson — GeoJSON FeatureCollection with same filters
  • GET /api/v1/observations/latest — most recent observation per source (for dashboard)

3.3 — Spatial query support

  • Implement bounding box filter: ST_Intersects(geom, ST_MakeEnvelope(xmin, ymin, xmax, ymax, 4326))
  • Implement proximity filter: ?lat=X&lng=Y&radius_km=Z using ST_DWithin
  • Add spatial index to observations table if not done in Phase 1

3.4 — Tag descendant filtering

  • When filtering by tag_id, use a recursive CTE or materialized path to include all descendant tags
  • Example: querying tag_id=air_quality should also return observations tagged aqi_realtime and pollutant_detail

3.5 — Enrichment endpoint

  • GET /api/v1/observations/{id}/raw — retrieve the original raw row from DuckDB via duckdb_source_ref + duckdb_row_id
  • Returns the unmodified source data for power users or debugging

3.6 — Tests

  • Test observation list with each filter type (source, tag, bbox, time range)
  • Test GeoJSON output format compliance
  • Test tag descendant inclusion
  • Test enrichment endpoint returns matching DuckDB data
  • Test pagination (limit, offset, total count in response headers)

Phase 4: Hardening + Observability

Goal: Production-ready error handling, logging, monitoring, and documentation.

4.1 — Error handling

  • Global exception handler middleware in FastAPI
  • Structured error responses: {"detail": "...", "error_code": "...", "source_id": "..."}
  • Fetcher-specific error handling: network timeout, rate limit (429), malformed response
  • Dead letter queue: failed fetches logged to a fetch_errors table for retry/debugging

4.2 — Logging

  • Structured JSON logging via structlog or python-json-logger
  • Log fields: timestamp, level, source_id, event, duration_ms, row_count, error
  • Request logging middleware (method, path, status, duration)

4.3 — Health + readiness

  • Enhance /api/v1/health to return:
    • Database connectivity (pg + duckdb dir writable)
    • Last successful fetch timestamp per source
    • Scheduler status (running/paused)
    • Observation count + latest observation timestamp
  • Add /api/v1/readiness for container orchestration probes

4.4 — API documentation

  • Ensure all routes have OpenAPI descriptions and example responses
  • Add tags to route groups in FastAPI for organized Swagger UI
  • Write docs/api-guide.md with quickstart examples

4.5 — Rate limiting + security

  • Add slowapi or custom middleware for API rate limiting
  • CORS configuration for PWA domain (terrapulse.info)
  • Input validation: sanitize bbox coords, clamp limit/offset ranges

4.6 — Tests

  • Test error responses for bad inputs (invalid bbox, nonexistent source_id)
  • Test rate limiting behavior
  • Test health endpoint reflects actual system state
  • Load test: verify 100 concurrent observation queries complete under 2s

Phase 5: Second-Wave Data Sources (keyed APIs)

Goal: Add data sources that require API key registration.

5.1 — Auth support in fetchers

  • Extend BaseFetcher to support auth_type: api_key_header, api_key_query, bearer_token
  • Store API keys in env vars, referenced by source_id in config
  • Never log or expose API keys

5.2 — New fetchers

  • epa_airnow.py: real-time AQI by zip code (API key required, 500 req/hr)
  • nws_alerts.py: active watches/warnings by point/zone (User-Agent header required)
  • nasa_firms.py: MODIS/VIIRS fire detections (Earthdata login → MAP key)
  • usdm_drought.py: U.S. Drought Monitor weekly status (open REST, no key)
  • noaa_co2.py: daily Mauna Loa CO2 from CSV flat file (cron scrape, no API)

5.3 — Field maps + tag rules

  • Add field maps for each new source
  • Add auto-tag rules mapping new sources to appropriate tag branches
  • Update seed script

5.4 — Tests

  • Mock tests for each new fetcher
  • Integration test: full pipeline with new sources → duckdb → postgres → API

Phase 6: PWA Frontend (future)

Goal: Consumer-facing Progressive Web App. Separate planning document needed.

Key decisions to make before starting:

  • Framework: Next.js vs SvelteKit vs Vite+React
  • Map library: Leaflet vs MapLibre GL vs Deck.gl
  • Chart library: D3.js vs Recharts vs Observable Plot
  • Push notification service: Firebase Cloud Messaging vs self-hosted Web Push
  • Hosting: Vercel, Cloudflare Pages, or self-hosted

MVP screens (from architecture research):

  1. Onboarding (location + interest selection, 2-3 screens max)
  2. Local Dashboard (hero metric + scrollable card stack)
  3. Explore / Map (toggleable data layers, time scrubber)
  4. Alerts feed (chronological, configurable thresholds)
  5. Settings (locations, notification prefs, account)

Milestones

Milestone Target Deliverable
M0 Week 1 Scaffolded repo, docker-compose runs, health endpoint returns 200
M1 Week 2-3 All models + migrations, CRUD for datasources and tags works
M2 Week 4-6 Pipeline runs on schedule, first data flowing into Postgres
M3 Week 7-8 Observation API with spatial + tag filtering, GeoJSON output
M4 Week 9-10 Logging, error handling, API docs, rate limiting
M5 Week 11-13 Keyed API sources added, full data coverage
M6 TBD PWA frontend kickoff
Live Feed