Initial data pipeline scaffold
This commit is contained in:
commit
37c3266d9c
4 changed files with 58 additions and 0 deletions
18
README.md
Normal file
18
README.md
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
# data-pipeline
|
||||||
|
|
||||||
|
ETL pipeline for Nexus analytics platform. Python + Apache Airflow.
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
|
python -m venv .venv && source .venv/bin/activate
|
||||||
|
pip install -r requirements.txt
|
||||||
|
airflow db init
|
||||||
|
airflow dags list
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
```
|
||||||
|
S3 (raw events) → ingest → transform → load → Redshift (analytics)
|
||||||
|
```
|
||||||
|
|
||||||
|
Dags run nightly at 01:00 UTC. See `config/pipeline.yml` for full schedule config.
|
||||||
11
config/pipeline.yml
Normal file
11
config/pipeline.yml
Normal file
|
|
@ -0,0 +1,11 @@
|
||||||
|
pipeline:
|
||||||
|
schedule: "0 3 * * *"
|
||||||
|
source:
|
||||||
|
bucket: nexus-data-raw
|
||||||
|
prefix: events/
|
||||||
|
destination:
|
||||||
|
host: redshift.nexus.local
|
||||||
|
database: analytics
|
||||||
|
schema: public
|
||||||
|
batch_size: 50000
|
||||||
|
max_retries: 3
|
||||||
8
requirements.txt
Normal file
8
requirements.txt
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
apache-airflow==2.9.1
|
||||||
|
apache-airflow-providers-amazon==8.18.0
|
||||||
|
pandas==2.2.2
|
||||||
|
sqlalchemy==2.0.29
|
||||||
|
psycopg2-binary==2.9.9
|
||||||
|
boto3==1.34.84
|
||||||
|
pyarrow==16.0.0
|
||||||
|
pytest==8.1.1
|
||||||
21
src/ingest/s3_reader.py
Normal file
21
src/ingest/s3_reader.py
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
import boto3
|
||||||
|
import pandas as pd
|
||||||
|
from io import BytesIO
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
def read_parquet_from_s3(bucket: str, key: str, columns: Optional[list] = None) -> pd.DataFrame:
|
||||||
|
"""Read a parquet file from S3 into a DataFrame."""
|
||||||
|
s3 = boto3.client("s3", region_name="eu-central-1")
|
||||||
|
obj = s3.get_object(Bucket=bucket, Key=key)
|
||||||
|
return pd.read_parquet(BytesIO(obj["Body"].read()), columns=columns)
|
||||||
|
|
||||||
|
|
||||||
|
def list_keys(bucket: str, prefix: str) -> list[str]:
|
||||||
|
"""List all keys under a prefix."""
|
||||||
|
s3 = boto3.client("s3", region_name="eu-central-1")
|
||||||
|
paginator = s3.get_paginator("list_objects_v2")
|
||||||
|
keys = []
|
||||||
|
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
|
||||||
|
keys.extend(obj["Key"] for obj in page.get("Contents", []))
|
||||||
|
return keys
|
||||||
Loading…
Reference in a new issue