This project demonstrates a production-style real-time streaming pipeline on GCP. It ingests live cryptocurrency market data via Pub/Sub, processes it using Apache Beam on Dataflow (including dead-letter handling for malformed events), and persists structured data to BigQuery for analytics via Looker Studio
The system is built using Google Cloud Platform managed services and follows a modern streaming data architecture.
The pipeline processes streaming messages produced by crypto price events (Buy and Sell) in real time.
Crypto Stream Producer
→ Pub/Sub
→ Dataflow (Apache Beam pipeline)
→ BigQuery
→ Analytics / SQL queries
- Python
- Apache Beam
- Google Cloud Dataflow
- Google Cloud Pub/Sub
- Google BigQuery
- Google Cloud Storage
- Looker Studio
The pipeline performs the following steps:
- Ingest streaming data from Pub/Sub
- Parse JSON messages
- Validate and transform events
- Write structured data to BigQuery
Example of received event:
{
"type":"ticker",
"sequence":123894075370,
"product_id":"BTC-USD",
"price":"70691.27",
"open_24h":"71290.91",
"volume_24h":"10302.66386023",
"low_24h":"68980.74",
"high_24h":"71485",
"volume_30d":"314531.69968052",
"best_bid":"70691.27",
"best_bid_size":"0.05851373",
"best_ask":"70691.28",
"best_ask_size":"0.01890000",
"side":"sell",
"time":"2026-03-11T16:54:02.163093Z",
"trade_id":978694534,
"last_size":"0.001705"
}Example of the data after preprocesing (DoFn)
{
"trade_id":978694534,
"symbol": "BTC-USD",
"price": 64210.12,
"operation": "sell",
"timestamp": "2026-03-10T14:00:00Z"
}| Field | Type |
|---|---|
| trade_id | STRING |
| symbol | STRING |
| price | FLOAT |
| operation | STRING |
| event_time | TIMESTAMP |
| processing_time | TIMESTAMP |
pip install -r requirements.txt
- Crate a SA to avoid using your personal account email to run the pipeline.
- Add the role
bbto run the Dataflow Service Account - Generate a JSON key and store it in your local machine
- Create an enviroment variable
GOOGLE_APPLICATION_CREDENTIALSwith your JSON path.
export GOOGLE_APPLICATION_CREDENTIALS=<KEY_TO_JSON_FILE> # For Linux systemspython crypto_pipeline.py --runner DirectRunner
Use the command below to submit the pipeline to cloud. Provide a name to easily identify your pipeline job by replacing PIPELINE_NAME, otherwise, Dataflow will set a random unfriendly name. You can use the same bucket for --temp_location and --staging_location but create different folders for each one.
python crypto_pipeline.py \
--runner DataflowRunner \
--project <YOUR_PROJECT_ID> \
--region <YOUR_REGION> \
--temp_location <YOUR_BUCKET>/temp \
--staging_location <YOUR_BUCKET>/staging \
--max_num_workers 1 \
--worker_machine_type e2-standard-2
--job_name=<PIPELINE_NAME>Important
The error ZONE_RESOURCE_POOL_EXHAUSTED is a common error related to availability of resources in the selected region. Try to run the pipeline in a different zone, even if it is different to the zone chosen for the project, but not too far, at least not an intercontinental region, to avoid high charges. Other alternative is to use a different type of machine for worker_machine_type parameter.
The dead letter sink as a BQ table

A was created using Looker Studio to visualise cryptocurrency price trends.
This architecture can be used for:
- Real-time financial analytics
- Market monitoring
- Trading dashboards
- Streaming anomaly detection
Possible improvements to the pipeline:
- Add data validation ✅
- Implement dead-letter queues ✅
- Add monitoring and alerting 🚧
- Add Looker dashboard ✅
- Add Terraform 🚧


