TerraPulse — Project Plan
Domain: terrapulse.info
Stack: FastAPI · DuckDB · PostgreSQL/PostGIS · Python 3.12+
Architecture: See docs/terrapulse-pipeline.mermaid
Conventions: See CLAUDE.md
Phase 0: Project Scaffolding
Goal: Runnable FastAPI app with databases connected and one health endpoint.
0.1 — Repository setup
- Initialize git repo
- Create
pyproject.tomlwith project metadata and dependencies:- fastapi, uvicorn, httpx, pydantic, pydantic-settings
- sqlalchemy[asyncio], asyncpg, geoalchemy2, alembic
- duckdb
- pytest, pytest-asyncio, respx
- Create
.env.examplewith:DATABASE_URL,DUCKDB_DIR,LOG_LEVEL - Create
src/terrapulse/__init__.pyandsrc/terrapulse/config.py(pydantic-settingsSettingsclass) - Create
.gitignore(includedata/duckdb/*.duckdb,.env,__pycache__,.venv)
0.2 — Docker environment
- Write
Dockerfile(python:3.12-slim, install deps, run uvicorn) - Write
docker-compose.ymlwith services:api: the FastAPI app (port 8000)db: postgres:16 with PostGIS (port 5432)db-test: separate postgres instance for tests (port 5433)
- Volume mount for
data/duckdb/directory
0.3 — Database connections
-
src/terrapulse/db/postgres.py: async SQLAlchemy engine + session factory -
src/terrapulse/db/duckdb_manager.py: helper class to open/read/write per-source .duckdb files - Alembic init with
alembic.inipointing toDATABASE_URLfrom env
0.4 — FastAPI skeleton
-
src/terrapulse/main.py: create FastAPI app, include routers, lifespan for db init/shutdown -
src/terrapulse/api/router.py: mount all route modules under/api/v1 -
src/terrapulse/api/routes/health.py:GET /api/v1/healthreturns{"status": "ok", "db": true/false, "version": "0.1.0"}
0.5 — Verify
-
docker-compose upstarts all services without errors -
curl localhost:8000/api/v1/healthreturns 200 with db: true -
pytestdiscovers and runs (even if no real tests yet)
Phase 1: Data Models + CRUD
Goal: All core database tables exist with migrations, and CRUD endpoints work for datasources and tags.
1.1 — SQLAlchemy models
-
src/terrapulse/db/models/datasource.py:Datasourcetable:source_id(uuid pk),name,base_url,endpoint_path,auth_type(enum: none/api_key/oauth),schedule_cron,is_active,created_at,updated_atFieldMaptable:id,source_id(fk),raw_field_name,normalized_name,data_type,unit,transform_fn(nullable)SourceDoctable:id,source_id(fk),licence,provider,notes_md,last_fetched_at,fetch_status(enum: idle/fetching/success/error)
-
src/terrapulse/db/models/tag.py:Tagtable:tag_id(int pk),parent_tag_id(self-referential fk, nullable),name,slug(unique),depthAutoTagRuletable:rule_id,source_id(fk),match_field,match_pattern,assign_tag_id(fk)
-
src/terrapulse/db/models/observation.py:Observationtable:id(bigint pk),source_id(fk),lat,lng,geom(geography point, auto-computed from lat/lng),timestamp_utc,value,unit,quality_flag,duckdb_source_ref,duckdb_row_id,created_atobservation_tagsjoin table:observation_id,tag_id
-
src/terrapulse/db/models/geo.py:GeoRegiontable:id,name,slug,level(enum: continent/country/state/city),parent_id(self-referential fk),geom(geography polygon, optional)
1.2 — Alembic migrations
- Generate initial migration from models
- Run migration, verify tables exist in postgres
- Add PostGIS extension in migration:
CREATE EXTENSION IF NOT EXISTS postgis - Add spatial index on
observations.geom
1.3 — Pydantic schemas
-
src/terrapulse/api/schemas/datasource.py:DatasourceCreate,DatasourceUpdate,DatasourceResponse,FieldMapCreate,FieldMapResponse -
src/terrapulse/api/schemas/tag.py:TagCreate,TagUpdate,TagResponse(includechildrenlist for tree serialization),AutoTagRuleCreate,AutoTagRuleResponse
1.4 — CRUD routes: Datasources
-
POST /api/v1/datasources— create a new datasource -
GET /api/v1/datasources— list all (with?is_active=truefilter) -
GET /api/v1/datasources/{source_id}— detail view (include field_maps and source_doc) -
PUT /api/v1/datasources/{source_id}— update -
DELETE /api/v1/datasources/{source_id}— soft delete (setis_active=false) -
POST /api/v1/datasources/{source_id}/field-maps— add field mapping -
GET /api/v1/datasources/{source_id}/field-maps— list field maps for source
1.5 — CRUD routes: Tags (hierarchical)
-
POST /api/v1/tags— create tag (with optionalparent_tag_id) -
GET /api/v1/tags— return full tree (recursive serialization) -
GET /api/v1/tags/{tag_id}— single tag with children -
PUT /api/v1/tags/{tag_id}— update (including reparenting) -
DELETE /api/v1/tags/{tag_id}— delete (fail if has children, or cascade option) -
POST /api/v1/tags/auto-rules— create auto-tag rule -
GET /api/v1/tags/auto-rules— list all rules (filterable bysource_id)
1.6 — Seed data
- Create seed script (
scripts/seed.py) that inserts:- The 6 MVP datasources with their field maps
- Initial tag tree (environment → air_quality, earthquake, wildfire, water, climate, radiation)
- Auto-tag rules mapping each source to its tag branch
1.7 — Tests
- Test all CRUD endpoints (create, read, update, delete)
- Test tag tree serialization (parent-child nesting)
- Test field map association with datasources
- Test auto-tag rule creation and validation
Phase 2: Ingestion Pipeline
Goal: Fetchers pull real data, stage it in DuckDB, normalize it, and write to Postgres.
2.1 — Base fetcher
-
src/terrapulse/ingestion/fetchers/base.py:- Abstract class:
async def fetch(self) -> list[dict] - Built-in: httpx client with timeout (30s), retries (3x, exponential backoff), logging
- Each fetcher receives its
Datasourcerecord (URL, field maps, etc.)
- Abstract class:
2.2 — DuckDB staging
- Enhance
duckdb_manager.py:stage_raw(source_ref: str, rows: list[dict]) -> list[int]— append rows to the source's .duckdb file, return assigned row_idsread_rows(source_ref: str, row_ids: list[int]) -> list[dict]— retrieve raw rows by id for enrichment- Auto-create table on first write (infer schema from first batch)
- Add
_fetched_at_utcand_batch_idcolumns automatically
2.3 — Implement MVP fetchers
-
usgs_earthquake.py: hit GeoJSON summary feed, parse features array -
open_meteo.py: hit weather + air quality + UV endpoints, merge responses -
usgs_water.py: hit instantaneous values endpoint, parse time series -
safecast.py: hit measurements endpoint, paginate if needed -
global_forest_watch.py: hit fire alerts endpoint -
world_bank.py: hit country indicators endpoint
2.4 — Tagger
-
src/terrapulse/ingestion/tagger.py:- Load auto_tag_rules for the source
- For each raw row, evaluate rules (field name + regex match on value)
- Return list of
tag_ids to attach - Default to the source's root tag if no rules match
2.5 — Normalizer
-
src/terrapulse/ingestion/normalizer.py:- Load
field_mapsfor the source - Rename columns:
raw_field_name→normalized_name - Cast data types per field map
- Convert units (e.g., Fahrenheit → Celsius if needed)
- Resolve
lat/lngtogeo_region_idvia reverse lookup - Output: list of dicts matching the
Observationschema - Include
duckdb_source_refandduckdb_row_idin each output row
- Load
2.6 — Orchestrator
-
src/terrapulse/ingestion/orchestrator.py:- Query all datasources where
is_active=true - For each: instantiate fetcher → fetch → tag → stage in duckdb → normalize → insert into postgres
- Run fetchers concurrently with
asyncio.gather()(with semaphore to limit concurrency) - Log per-source: status, row count, duration, errors
- Query all datasources where
2.7 — Scheduler
-
src/terrapulse/ingestion/scheduler.py:- On app startup, register the orchestrator to run on a 60-second interval
- Use APScheduler's
AsyncIOScheduleror a simpleasyncio.create_taskloop - Respect each source's
schedule_cronfield (some sources may run every 5 min, others hourly)
2.8 — Tests
- Mock each fetcher's HTTP response, verify raw output shape
- Test DuckDB staging: write rows, read back, verify row_ids
- Test tagger: given rules and raw data, verify correct tag assignments
- Test normalizer: given field maps and raw data, verify output schema
- Test orchestrator end-to-end with mocked fetchers and test databases
Phase 3: Observation API + Spatial Queries
Goal: Expose normalized data through REST and GeoJSON endpoints with filtering.
3.1 — Observation schemas
-
ObservationResponse: all fields from the model + resolved tag names + source name -
ObservationGeoJSON: standard GeoJSON Feature with observation data inproperties -
ObservationFilters: query params model —source_id,tag_id,bbox(comma-separated),time_start,time_end,limit,offset
3.2 — Observation routes
-
GET /api/v1/observations— paginated list with filters -
GET /api/v1/observations/{id}— single observation with full detail -
GET /api/v1/observations/geojson— GeoJSON FeatureCollection with same filters -
GET /api/v1/observations/latest— most recent observation per source (for dashboard)
3.3 — Spatial query support
- Implement bounding box filter:
ST_Intersects(geom, ST_MakeEnvelope(xmin, ymin, xmax, ymax, 4326)) - Implement proximity filter:
?lat=X&lng=Y&radius_km=ZusingST_DWithin - Add spatial index to observations table if not done in Phase 1
3.4 — Tag descendant filtering
- When filtering by
tag_id, use a recursive CTE or materialized path to include all descendant tags - Example: querying
tag_id=air_qualityshould also return observations taggedaqi_realtimeandpollutant_detail
3.5 — Enrichment endpoint
-
GET /api/v1/observations/{id}/raw— retrieve the original raw row from DuckDB viaduckdb_source_ref+duckdb_row_id - Returns the unmodified source data for power users or debugging
3.6 — Tests
- Test observation list with each filter type (source, tag, bbox, time range)
- Test GeoJSON output format compliance
- Test tag descendant inclusion
- Test enrichment endpoint returns matching DuckDB data
- Test pagination (limit, offset, total count in response headers)
Phase 4: Hardening + Observability
Goal: Production-ready error handling, logging, monitoring, and documentation.
4.1 — Error handling
- Global exception handler middleware in FastAPI
- Structured error responses:
{"detail": "...", "error_code": "...", "source_id": "..."} - Fetcher-specific error handling: network timeout, rate limit (429), malformed response
- Dead letter queue: failed fetches logged to a
fetch_errorstable for retry/debugging
4.2 — Logging
- Structured JSON logging via
structlogorpython-json-logger - Log fields:
timestamp,level,source_id,event,duration_ms,row_count,error - Request logging middleware (method, path, status, duration)
4.3 — Health + readiness
- Enhance
/api/v1/healthto return:- Database connectivity (pg + duckdb dir writable)
- Last successful fetch timestamp per source
- Scheduler status (running/paused)
- Observation count + latest observation timestamp
- Add
/api/v1/readinessfor container orchestration probes
4.4 — API documentation
- Ensure all routes have OpenAPI descriptions and example responses
- Add
tagsto route groups in FastAPI for organized Swagger UI - Write
docs/api-guide.mdwith quickstart examples
4.5 — Rate limiting + security
- Add
slowapior custom middleware for API rate limiting - CORS configuration for PWA domain (terrapulse.info)
- Input validation: sanitize bbox coords, clamp limit/offset ranges
4.6 — Tests
- Test error responses for bad inputs (invalid bbox, nonexistent source_id)
- Test rate limiting behavior
- Test health endpoint reflects actual system state
- Load test: verify 100 concurrent observation queries complete under 2s
Phase 5: Second-Wave Data Sources (keyed APIs)
Goal: Add data sources that require API key registration.
5.1 — Auth support in fetchers
- Extend
BaseFetcherto supportauth_type:api_key_header,api_key_query,bearer_token - Store API keys in env vars, referenced by
source_idin config - Never log or expose API keys
5.2 — New fetchers
-
epa_airnow.py: real-time AQI by zip code (API key required, 500 req/hr) -
nws_alerts.py: active watches/warnings by point/zone (User-Agent header required) -
nasa_firms.py: MODIS/VIIRS fire detections (Earthdata login → MAP key) -
usdm_drought.py: U.S. Drought Monitor weekly status (open REST, no key) -
noaa_co2.py: daily Mauna Loa CO2 from CSV flat file (cron scrape, no API)
5.3 — Field maps + tag rules
- Add field maps for each new source
- Add auto-tag rules mapping new sources to appropriate tag branches
- Update seed script
5.4 — Tests
- Mock tests for each new fetcher
- Integration test: full pipeline with new sources → duckdb → postgres → API
Phase 6: PWA Frontend (future)
Goal: Consumer-facing Progressive Web App. Separate planning document needed.
Key decisions to make before starting:
- Framework: Next.js vs SvelteKit vs Vite+React
- Map library: Leaflet vs MapLibre GL vs Deck.gl
- Chart library: D3.js vs Recharts vs Observable Plot
- Push notification service: Firebase Cloud Messaging vs self-hosted Web Push
- Hosting: Vercel, Cloudflare Pages, or self-hosted
MVP screens (from architecture research):
- Onboarding (location + interest selection, 2-3 screens max)
- Local Dashboard (hero metric + scrollable card stack)
- Explore / Map (toggleable data layers, time scrubber)
- Alerts feed (chronological, configurable thresholds)
- Settings (locations, notification prefs, account)
Milestones
| Milestone | Target | Deliverable |
|---|---|---|
| M0 | Week 1 | Scaffolded repo, docker-compose runs, health endpoint returns 200 |
| M1 | Week 2-3 | All models + migrations, CRUD for datasources and tags works |
| M2 | Week 4-6 | Pipeline runs on schedule, first data flowing into Postgres |
| M3 | Week 7-8 | Observation API with spatial + tag filtering, GeoJSON output |
| M4 | Week 9-10 | Logging, error handling, API docs, rate limiting |
| M5 | Week 11-13 | Keyed API sources added, full data coverage |
| M6 | TBD | PWA frontend kickoff |