Skip to content

Commit e30b3e6

Browse files
committed
refactor(market-data-ingestion): change polygon.io to massive.com
Change stocks data HTTP client from polygon.io to massive.com following its rebranding.
1 parent 3ac201c commit e30b3e6

9 files changed

Lines changed: 653 additions & 642 deletions

File tree

data-ingestion/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Data pipeline to ingest stock market datasets into a Time Series Database.
1010
2. Defines the required environment variables
1111

1212
```sh
13-
export POLYGON_API_KEY=...
13+
export MASSIVE_API_KEY=...
1414

1515
export POSTGRES_USER="finwar"
1616
export POSTGRES_PASSWORD="password"
@@ -21,7 +21,7 @@ export POSTGRES_DB="finwar"
2121

2222

2323
> [!TIP]
24-
> You can get a Polygon API key for free by creating an account on [polygon.io](https://polygon.io/).
24+
> You can get a Massive API key for free by creating an account on [massive.com](https://massive.com/).
2525
2626
### Installing dependencies
2727

@@ -57,4 +57,4 @@ uv run dg launch --assets download_stocks_intraday_history,load_stocks_intraday_
5757
*here the pipeline is launched to collect the intraday history data from Apple Inc. on the last week of september.*
5858

5959
> [!WARNING]
60-
> The Polygon API free account only goes up to 5 req/min so beware in the number of parition you run.
60+
> The Massive API free account only goes up to 5 req/min so beware in the number of parition you run.

data-ingestion/docker/Dockerfile

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ FROM docker.io/library/python:3.13.7-slim AS builder
22

33
ENV UV_COMPILE_BYTECODE=1 UV_LINK_MODE=copy UV_PYTHON_DOWNLOADS=0
44

5-
COPY --from=ghcr.io/astral-sh/uv:0.8.22 /uv /uvx /bin/
5+
COPY --from=ghcr.io/astral-sh/uv:0.9.6 /uv /uvx /bin/
66

77
WORKDIR /app
88

@@ -14,7 +14,12 @@ COPY . /app
1414

1515
RUN uv sync --no-dev --locked
1616

17-
FROM docker.io/library/python:3.13.7-slim AS runtime
17+
FROM docker.io/library/python:3.13.7-slim AS runner
18+
19+
LABEL org.opencontainers.image.authors="guilhem.sante@kelps.org"
20+
LABEL org.opencontainers.image.title="FinWar Data Ingestion User Code"
21+
LABEL org.opencontainers.image.description="Data pipeline to ingest stock market datasets into a Time Series Database."
22+
LABEL org.opencontainers.image.licenses="MIT"
1823

1924
RUN groupadd --system --gid 999 appuser \
2025
&& useradd --system --gid 999 --uid 999 --create-home appuser

data-ingestion/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "finwar_data_ingestion"
3-
version = "0.1.0"
3+
version = "0.2.0"
44
description = "Data pipeline to ingest stock market datasets into a Time Series Database"
55
authors = [
66
{ name = "Coding Kelps" }
@@ -10,8 +10,8 @@ requires-python = "==3.13.*"
1010
dependencies = [
1111
"dagster==1.11.10",
1212
"dagster-docker>=0.27.10",
13+
"massive>=2.0.1",
1314
"pandas>=2.3.2",
14-
"polygon>=1.2.8",
1515
"psycopg[binary]>=3.2.10",
1616
"python-dateutil>=2.9.0.post0",
1717
]

data-ingestion/src/finwar_data_ingestion/components/stocks_intraday_history.py

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
from datetime import datetime
55
from typing import Literal, List
66

7-
from finwar_data_ingestion.resources import PostgreSQLResource, PolygonResource
7+
from finwar_data_ingestion.resources import PostgreSQLResource, MassiveResource
88

99

10-
MAX_POLYGON_REQ_RETRY = 3
10+
MAX_MASSIVE_REQ_RETRY = 3
1111

1212

1313
class StocksIntradayHistoryComponent(dg.Component, dg.Model, dg.Resolvable):
1414
"""Represents historical stock quotation for one or more ticker symbols.
1515
16-
The data is retrieved from the [Polygon.io](https://polygon.io) API and returned as a
16+
The data is retrieved from the [Massive](https://massive.com) API and returned as a
1717
:class:`pandas.DataFrame`. The data also contains the trading volume.
1818
It is then loaded as a TimescaleDB table.
1919
@@ -25,7 +25,7 @@ class StocksIntradayHistoryComponent(dg.Component, dg.Model, dg.Resolvable):
2525
time_partitioning (Literal["daily", "weekly", "monthly"]): The partitioning frequency
2626
for the data retrieval. The partitioning unit should be shorter or equal to the
2727
requested time window. Default is ``"monthly"``.
28-
polygon_api_key (str): API key for authenticating with the Polygon.io API.
28+
massive_api_key (str): API key for authenticating with the Massive API.
2929
precision (Literal["second", "minute", "day", "week", "month", "quarter", "year"]):
3030
The granularity of the bars. Must be finer than or equal to the requested
3131
time window. Default is ``"day"``.
@@ -42,7 +42,7 @@ class StocksIntradayHistoryComponent(dg.Component, dg.Model, dg.Resolvable):
4242
start_date: str
4343
end_date: str | None = None
4444
time_partitioning: Literal['daily', 'weekly', 'monthly'] = 'monthly'
45-
polygon_api_key: str
45+
massive_api_key: str
4646
precision: Literal['second', 'minute', 'day', 'week', 'month', 'quarter', 'year'] = 'day'
4747
adjusted: bool = False
4848
table_name: str = 'stocks_history'
@@ -57,7 +57,7 @@ def get_spec(cls) -> dg.ComponentTypeSpec:
5757
return dg.ComponentTypeSpec(
5858
description=cls.__doc__,
5959
owners=['contact@kelps.org', 'guilhem.sante@kelps.org'],
60-
tags=['stocks', 'intraday', 'polygon.io', 'timescaledb'],
60+
tags=['stocks', 'intraday', 'massive.com', 'timescaledb'],
6161
)
6262

6363
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
@@ -84,57 +84,57 @@ def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
8484
group_name='ingestion',
8585
code_version='0.3.0',
8686
description="""
87-
Extract the stocks intraday value from the Polygon API.
87+
Extract the stocks intraday value from the Massive API.
8888
""",
8989
partitions_def=partitions_matrix,
9090
)
9191
def download_stocks_intraday_history(
9292
context: AssetExecutionContext,
93-
polygon: PolygonResource,
93+
massive: MassiveResource,
9494
) -> dg.MaterializeResult:
9595
partition_keys = context.partition_key.keys_by_dimension
9696
time_window = time_partitioning.time_window_for_partition_key(partition_keys['time'])
9797
symbol = partition_keys['symbol']
9898

99-
with polygon.get_client() as client:
100-
for attempt in range(0, MAX_POLYGON_REQ_RETRY):
101-
try:
102-
res = client.get_aggregate_bars(
103-
symbol=symbol,
104-
multiplier=1,
105-
adjusted=self.adjusted,
106-
timespan=self.precision,
107-
from_date=time_window.start,
108-
to_date=time_window.end,
109-
limit=50000, # API max limit
99+
client = massive.get_client()
100+
101+
for attempt in range(0, MAX_MASSIVE_REQ_RETRY):
102+
try:
103+
aggs = []
104+
for a in client.list_aggs(
105+
ticker=symbol,
106+
multiplier=1,
107+
adjusted=self.adjusted,
108+
timespan=self.precision,
109+
from_=time_window.start,
110+
to=time_window.end,
111+
limit=50000, # API max limit
112+
):
113+
aggs.append(a)
114+
115+
except Exception as e:
116+
if attempt == MAX_MASSIVE_REQ_RETRY:
117+
raise dg.Failure(
118+
description=f'failed to get intraday stock data from Massive API: {e}'
119+
)
120+
else:
121+
context.log.warning(
122+
f'Massive API request failed (attempt n°{attempt + 1}/{MAX_MASSIVE_REQ_RETRY}): {e}',
110123
)
111-
112-
if res['status'] == 'ERROR' or res['results'] is None:
113-
raise Exception(res['error'])
114-
break
115-
except Exception as e:
116-
if attempt == MAX_POLYGON_REQ_RETRY:
117-
raise dg.Failure(
118-
description=f'failed to get intraday stock data from Polygon API: {e}'
119-
)
120-
else:
121-
context.log.warning(
122-
f'polygon.io API request failed (attempt n°{attempt + 1}/{MAX_POLYGON_REQ_RETRY}): {e}',
123-
)
124124

125125
return dg.MaterializeResult(
126126
value=pd.DataFrame(
127127
[
128128
{
129-
'timestamp': datetime.fromtimestamp(bar['t'] / 1000),
129+
'timestamp': datetime.fromtimestamp(bar.timestamp / 1000),
130130
'symbol': symbol,
131-
'open': bar['o'],
132-
'high': bar['h'],
133-
'low': bar['l'],
134-
'close': bar['c'],
135-
'volume': bar['v'],
131+
'open': bar.open,
132+
'high': bar.high,
133+
'low': bar.low,
134+
'close': bar.close,
135+
'volume': bar.volume,
136136
}
137-
for bar in res['results']
137+
for bar in aggs
138138
]
139139
),
140140
metadata={
@@ -222,8 +222,8 @@ def load_stocks_intraday_history(
222222
load_stocks_intraday_history,
223223
],
224224
resources={
225-
'polygon': PolygonResource(
226-
api_key=self.polygon_api_key,
225+
'massive': MassiveResource(
226+
api_key=self.massive_api_key,
227227
),
228228
'postgresql': PostgreSQLResource(
229229
username=self.timescaledb_username,

data-ingestion/src/finwar_data_ingestion/defs/stocks_intraday_history/defs.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ attributes:
5757
- CAT
5858
- AMGN
5959
- NEE
60-
polygon_api_key: "{{ env.POLYGON_API_KEY }}"
60+
massive_api_key: "{{ env.MASSIVE_API_KEY }}"
6161
timescaledb_username: "{{ env.POSTGRES_USER }}"
6262
timescaledb_password: "{{ env.POSTGRES_PASSWORD }}"
6363
timescaledb_host: "{{ env.POSTGRES_HOST }}"
@@ -66,7 +66,7 @@ attributes:
6666

6767
requirements:
6868
env:
69-
- POLYGON_API_KEY
69+
- MASSIVE_API_KEY
7070
- POSTGRES_USER
7171
- POSTGRES_PASSWORD
7272
- POSTGRES_HOST
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
from .postgresql import PostgreSQLResource as PostgreSQLResource
2-
from .polygon import PolygonResource as PolygonResource
2+
from .massive import MassiveResource as MassiveResource
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from dagster import ConfigurableResource
2+
from pydantic import Field
3+
4+
from massive import RESTClient
5+
6+
7+
class MassiveResource(ConfigurableResource):
8+
"""
9+
A Dagster resource wrapper over the massive.com REST client.
10+
"""
11+
12+
api_key: str = Field(description='The required API key to authenticate.')
13+
14+
def get_client(self) -> RESTClient:
15+
return RESTClient(
16+
api_key=self.api_key,
17+
)

data-ingestion/src/finwar_data_ingestion/resources/polygon.py

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)