Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/examples/data_import/parquet/flat_dataset/.env-example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SIFT_API_URI=""
SIFT_API_KEY=""
ASSET_NAME=""
44 changes: 44 additions & 0 deletions python/examples/data_import/parquet/flat_dataset/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os

from dotenv import load_dotenv
from sift_py.data_import.parquet import ParquetUploadService
from sift_py.data_import.status import DataImportService
from sift_py.data_import.time_format import TimeFormatType
from sift_py.rest import SiftRestConfig

if __name__ == "__main__":
"""
Example usage for uploading a Parquet (flat dataset).
"""
load_dotenv()

sift_uri = os.getenv("SIFT_API_URI")
assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set"

apikey = os.getenv("SIFT_API_KEY")
assert apikey, "expected 'SIFT_API_KEY' environment variable to be set"

asset_name = os.getenv("ASSET_NAME")
assert asset_name, "expected 'ASSET_NAME' environment variable to be set"

rest_config: SiftRestConfig = {
"uri": sift_uri,
"apikey": apikey,
}

parquet_upload_service = ParquetUploadService(rest_config)

import_service: DataImportService = parquet_upload_service.flat_dataset_upload(
asset_name=asset_name,
run_name="Example Parquet Upload",
path="sample_data.parquet",
time_path="timestamp",
time_format=TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS,
)

data_import = import_service.get_data_import()
print(data_import.model_dump_json(indent=1))

print("Waiting for upload to complete...")
import_service.wait_until_complete()
print("Upload example complete!")
Binary file not shown.
67 changes: 67 additions & 0 deletions python/lib/sift_py/data_import/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing_extensions import Self

from sift_py._internal.channel import channel_fqn
from sift_py.data_import.parquet_complex_types import ParquetComplexTypesImportModeType
from sift_py.data_import.time_format import TimeFormatType
from sift_py.error import _component_deprecation_warning
from sift_py.ingestion.channel import ChannelBitFieldElement, ChannelDataType, ChannelEnumType
Expand Down Expand Up @@ -239,3 +240,69 @@ class Hdf5DataCfg(ConfigDataModel):
time_column: int = 1
value_dataset: str
value_column: int = 1


class ParquetTimeColumn(ConfigTimeModel):
"""
Defines a time column entry in the Parquet config.
"""

path: str


class ParquetDataColumn(ConfigBaseModel):
"""
Defines a data column entry in the Parquet config.
"""

path: str
channel_config: ConfigDataModel


class ParquetFlatDatasetConfig(ConfigBaseModel):
"""
Defines the flat dataset config for Parquet files.
"""

time_column: ParquetTimeColumn
data_columns: List[ParquetDataColumn]


class ParquetConfigImpl(ConfigBaseModel):
"""
Defines the Parquet config spec.
"""

asset_name: str
run_name: str = ""
run_id: str = ""
flat_dataset: Optional[ParquetFlatDatasetConfig] = None
footer_offset: int
footer_length: int
complex_types_import_mode: Union[str, ParquetComplexTypesImportModeType]

@model_validator(mode="after")
def validate_config(self) -> Self:
if self.run_name and self.run_id:
raise PydanticCustomError(
"invalid_config_error", "Only specify run_name or run_id, not both."
)
return self

@field_validator("complex_types_import_mode", mode="before")
@classmethod
def convert_complex_types_import_mode(cls, raw: Optional[str]) -> Optional[str]:
"""
Converts the provided complex_types_import_mode value to a string.
"""
if raw is None:
return None
if isinstance(raw, ParquetComplexTypesImportModeType):
return raw.as_human_str()
elif isinstance(raw, str):
value = ParquetComplexTypesImportModeType.from_str(raw)
if value is not None:
return value.as_human_str()
raise PydanticCustomError(
"invalid_config_error", f"Invalid complex_types_import_mode: {raw}."
)
Loading
Loading