Listening for events…

title: “TerraPulse — Data Pipeline Architecture (from whiteboard)”

flowchart TD %% ── Scheduler Layer ────────────────────────────────── CRON[“⏱ CRON Scheduler\n(60s interval)”]

%% ── Orchestration Layer ──────────────────────────────
ORCH["🔀 Orchestrator\n(FastAPI service)"]
CRON -->|"tick"| ORCH

%% ── Source Fetcher Layer (horizontal pipeline) ───────
subgraph FETCHERS["Source Fetchers"]
    direction LR
    F1["EPA\nAirNow"]
    F2["USGS\nEarthquake"]
    F3["NWS\nAlerts"]
    F4["NASA\nFIRMS"]
    F5["USDM\nDrought"]
    FN["Source\nN ..."]
end
ORCH -->|"dispatch_jobs"| FETCHERS

%% ── Tagging ──────────────────────────────────────────
TAGGER["🏷 Tagger\n(resolve tag_ids from\ntag_tree + auto_rules)"]
FETCHERS -->|"raw_responses"| TAGGER

%% ── Per-Source DuckDB Staging ────────────────────────
subgraph DUCKDB_LAYER["Per-Source DuckDB Files"]
    direction LR
    D1[("🦆 air_quality.duckdb")]
    D2[("🦆 earthquake.duckdb")]
    D3[("🦆 weather_alerts.duckdb")]
    D4[("🦆 wildfire.duckdb")]
    D5[("🦆 drought.duckdb")]
    DN[("🦆 source_n.duckdb")]
end
TAGGER -->|"tagged_raw_data"| DUCKDB_LAYER

%% ── Normalization ────────────────────────────────────
NORM["⚙️ Normalizer\n(clean_headers, cast_dtypes,\nconvert_units, resolve_geo)"]
DUCKDB_LAYER -->|"raw_rows"| NORM

%% ── Geographic Hierarchy (from whiteboard left side) ─
GEO["🌍 geo_hierarchy\ncontinent → country → state → city\n(north_america → us → ky → lex)"]
NORM -.->|"resolve_geo_keys"| GEO

%% ── Normalized Schema ────────────────────────────────
SCHEMA["📋 normalized_row\n─────────────────\nlat: float\nlng: float\nsource_id: uuid\ntag_ids: int[]\ntimestamp_utc: datetime\nvalue: float\nunit: str\nquality_flag: str\nduckdb_source_ref: str\nduckdb_row_id: int"]
NORM -->|"clean_rows"| SCHEMA

%% ── Primary Database ─────────────────────────────────
PG[("🐘 PostgreSQL + PostGIS\n(primary store)")]
SCHEMA -->|"insert / upsert"| PG

%% ── Back-Reference ───────────────────────────────────
PG -.->|"duckdb_source_ref +\nduckdb_row_id → enrich"| DUCKDB_LAYER

%% ── CRUD: Datasource Manager ─────────────────────────
subgraph CRUD_SOURCES["CRUD: Datasources"]
    direction TB
    SRC_LIST["📡 datasource_list\n─────────────────\nsource_id: uuid\nname: str\nbase_url: str\nendpoint_path: str\nauth_type: enum\nschedule_cron: str\nis_active: bool"]
    SRC_FIELDS["📋 field_map\n─────────────────\nraw_field_name: str\nnormalized_name: str\ndata_type: str\nunit: str\ntransform_fn: str | null"]
    SRC_META["📝 source_docs\n─────────────────\nlicence: str\nprovider: str\nnotes_md: text\nlast_fetched_at: datetime\nfetch_status: enum"]
    SRC_LIST --- SRC_FIELDS
    SRC_LIST --- SRC_META
end

%% ── CRUD: Tag Manager (hierarchical) ─────────────────
subgraph CRUD_TAGS["CRUD: Tags (hierarchical tree)"]
    direction TB
    TAG_TREE["🏷 tag_tree\n─────────────────\ntag_id: int\nparent_tag_id: int | null\nname: str\nslug: str\ndepth: int"]
    TAG_EXAMPLE["Example tree:\n🌍 environment\n├── 🌬 air_quality\n│   ├── aqi_realtime\n│   └── pollutant_detail\n├── 🔥 wildfire\n│   ├── active_fire\n│   └── fire_risk\n├── 🌊 water\n│   ├── flood_alert\n│   └── drought_status\n└── 🌡 climate\n    ├── temp_anomaly\n    └── heat_index"]
    TAG_RULES["⚙️ auto_tag_rules\n─────────────────\nrule_id: int\nsource_id: uuid\nmatch_field: str\nmatch_pattern: str\nassign_tag_id: int"]
    TAG_TREE --- TAG_EXAMPLE
    TAG_TREE --- TAG_RULES
end

%% ── Admin connections ────────────────────────────────
CRUD_SOURCES -->|"configure_sources"| ORCH
CRUD_TAGS -->|"tag_definitions +\nauto_tag_rules"| TAGGER
CRUD_SOURCES -->|"read / write"| PG
CRUD_TAGS -->|"read / write"| PG

%% ── API Output ───────────────────────────────────────
API["🚀 TerraPulse API\n(FastAPI → REST / GeoJSON)"]
PG -->|"query"| API
API -->|"💰"| CLIENTS["PWA · B2B · Widgets"]

%% ── Styling ──────────────────────────────────────────
classDef scheduler fill:#4A90D9,stroke:#2C5F8A,color:#fff
classDef orchestrator fill:#7B68EE,stroke:#5A4DB0,color:#fff
classDef fetcher fill:#F5A623,stroke:#C07D12,color:#fff
classDef duck fill:#FFD700,stroke:#B8960F,color:#333
classDef normalize fill:#50C878,stroke:#3A9459,color:#fff
classDef postgres fill:#336791,stroke:#264D6E,color:#fff
classDef crud fill:#E8E8E8,stroke:#999,color:#333
classDef api fill:#FF6B6B,stroke:#CC4444,color:#fff
classDef geo fill:#87CEEB,stroke:#5AA0C4,color:#333
classDef schema fill:#98FB98,stroke:#5BBF5B,color:#333
classDef tags fill:#DDA0DD,stroke:#9B6E9B,color:#333

class CRON scheduler
class ORCH orchestrator
class F1,F2,F3,F4,F5,FN fetcher
class D1,D2,D3,D4,D5,DN duck
class NORM normalize
class TAGGER normalize
class PG postgres
class SRC_LIST,SRC_FIELDS,SRC_META crud
class TAG_TREE,TAG_EXAMPLE,TAG_RULES tags
class API api
class GEO geo
class SCHEMA schema
Live Feed