Skip to content
49 changes: 49 additions & 0 deletions go/internal/feast/model/sortedfeatureview.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package model

import (
"github.com/feast-dev/feast/go/protos/feast/core"
)

type SortKey struct {
FieldName string
Order string
}

func NewSortKeyFromProto(proto *core.SortKey) *SortKey {
return &SortKey{
FieldName: proto.GetName(),
Order: proto.GetDefaultSortOrder().String(),
}
}

type SortedFeatureView struct {
*FeatureView
SortKeys []*SortKey
}

func NewSortedFeatureViewFromProto(proto *core.SortedFeatureView) *SortedFeatureView {
// Create a base FeatureView using Spec fields from the proto.
baseFV := &FeatureView{
Base: NewBaseFeatureView(proto.GetSpec().GetName(), proto.GetSpec().GetFeatures()),
Ttl: proto.GetSpec().GetTtl(),
}

// Convert each sort key from the proto.
sortKeys := make([]*SortKey, len(proto.GetSpec().GetSortKeys()))
for i, skProto := range proto.GetSpec().GetSortKeys() {
sortKeys[i] = NewSortKeyFromProto(skProto)
}

return &SortedFeatureView{
FeatureView: baseFV,
SortKeys: sortKeys,
}
}

func (sfv *SortedFeatureView) NewSortedFeatureViewFromBase(base *BaseFeatureView) *SortedFeatureView {
newFV := sfv.FeatureView.NewFeatureViewFromBase(base)
return &SortedFeatureView{
FeatureView: newFV,
SortKeys: sfv.SortKeys,
}
}
10 changes: 9 additions & 1 deletion sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.SortedFeatureView_pb2 import (
SortedFeatureView as SortedFeatureViewProto,
)
from feast.protos.feast.core.StreamFeatureView_pb2 import (
StreamFeatureView as StreamFeatureViewProto,
)
Expand Down Expand Up @@ -98,7 +101,12 @@ def proto_class(self) -> Type[Message]:
@abstractmethod
def to_proto(
self,
) -> Union[FeatureViewProto, OnDemandFeatureViewProto, StreamFeatureViewProto]:
) -> Union[
FeatureViewProto,
OnDemandFeatureViewProto,
StreamFeatureViewProto,
SortedFeatureViewProto,
]:
pass

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from datetime import datetime
from functools import partial
from queue import Queue
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
from tokenize import Double
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union

from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import (
Expand All @@ -40,13 +41,26 @@
from cassandra.query import BatchStatement, BatchType, PreparedStatement
from pydantic import StrictFloat, StrictInt, StrictStr

from feast import Entity, FeatureView, RepoConfig
from feast import Entity, FeatureView, RepoConfig, ValueType
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.SortedFeatureView_pb2 import SortOrder
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.rate_limiter import SlidingWindowRateLimiter
from feast.repo_config import FeastConfigBaseModel
from feast.sorted_feature_view import SortedFeatureView
from feast.types import (
Bool,
Bytes,
Float32,
Float64,
Int32,
Int64,
String,
UnixTimestamp,
from_value_type,
)

# Error messages
E_CASSANDRA_UNEXPECTED_CONFIGURATION_CLASS = (
Expand Down Expand Up @@ -684,26 +698,72 @@ def _drop_table(
logger.info(f"Deleting table {fqtable}.")
session.execute(drop_cql)

def _create_table(self, config: RepoConfig, project: str, table: FeatureView):
def _create_table(
self,
config: RepoConfig,
project: str,
table: Union[FeatureView, SortedFeatureView],
):
"""Handle the CQL (low-level) creation of a table."""
session: Session = self._get_session(config)
keyspace: str = self._keyspace
table_name_version = config.online_store.table_name_format_version
fqtable = CassandraOnlineStore._fq_table_name(
keyspace, project, table, table_name_version
)
create_cql = self._get_cql_statement(
config,
"create",
fqtable,
project=project,
feature_view=table.name,
)
if isinstance(table, SortedFeatureView):
create_cql = self._build_sorted_table_cql(project, table, fqtable)
else:
create_cql = self._get_cql_statement(
config,
"create",
fqtable,
project=project,
feature_view=table.name,
)
logger.info(
f"Creating table {fqtable} in keyspace {keyspace} if not exists using {create_cql}."
)
session.execute(create_cql)

def _build_sorted_table_cql(
self, project: str, table: SortedFeatureView, fqtable: str
) -> str:
"""
Build the CQL statement for creating a SortedFeatureView table with custom
entity and sort key columns.
"""
feature_columns = [
f"{feature.name} {self._get_cql_type(feature.dtype)}"
for feature in table.features
]

sort_key_columns = [
f"{sk.name} {self._get_cql_type(from_value_type(sk.value_type))}"
for sk in table.sort_keys
]

sort_key_orders = [
f"{sk.name} {'ASC' if sk.default_sort_order == SortOrder.Enum.ASC else 'DESC'}"
for sk in table.sort_keys
]

sort_key_names = ", ".join([col.split()[0] for col in sort_key_columns])

feature_columns_str = ",".join(feature_columns)

create_cql = (
f"CREATE TABLE IF NOT EXISTS {fqtable} (\n"
f" entity_key TEXT,\n"
f" {feature_columns_str},\n"
f" event_ts TIMESTAMP,\n"
f" created_ts TIMESTAMP,\n"
f" PRIMARY KEY ((entity_key), {sort_key_names})\n"
f") WITH CLUSTERING ORDER BY ({', '.join(sort_key_orders)})\n"
f"AND COMMENT='project={project}, feature_view={table.name}';"
)
return create_cql.strip()

def _get_cql_statement(
self, config: RepoConfig, op_name: str, fqtable: str, **kwargs
):
Expand Down Expand Up @@ -737,3 +797,23 @@ def _get_cql_statement(
return self._prepared_statements[cache_key]
else:
return statement

def _get_cql_type(self, value_type: ValueType) -> str:
"""Map Feast value types to Cassandra CQL data types."""
# Mapping for scalar types.
scalar_mapping = {
Bytes: "BLOB",
String: "TEXT",
Int32: "INT",
Int64: "BIGINT",
Double: "DOUBLE",
Float32: "FLOAT",
Float64: "FLOAT",
Bool: "BOOLEAN",
UnixTimestamp: "TIMESTAMP",
}

if value_type in scalar_mapping:
return scalar_mapping[value_type]
else:
raise ValueError(f"Unsupported type: {value_type}")
89 changes: 89 additions & 0 deletions sdk/python/feast/sort_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import warnings
from typing import Dict, Optional, Union

from typeguard import typechecked

from feast.entity import Entity
from feast.protos.feast.core.SortedFeatureView_pb2 import (
SortKey as SortKeyProto,
)
from feast.protos.feast.core.SortedFeatureView_pb2 import (
SortOrder,
)
from feast.types import ComplexFeastType, PrimitiveFeastType
from feast.value_type import ValueType

warnings.simplefilter("ignore", DeprecationWarning)

# DUMMY_ENTITY is a placeholder entity used in entityless FeatureViews
DUMMY_ENTITY_ID = "__dummy_id"
DUMMY_ENTITY_NAME = "__dummy"
DUMMY_ENTITY = Entity(
name=DUMMY_ENTITY_NAME,
join_keys=[DUMMY_ENTITY_ID],
)


@typechecked
class SortKey:
"""
A helper class representing a sorting key for a SortedFeatureView.
"""

name: str
value_type: ValueType
default_sort_order: SortOrder.Enum.ValueType
tags: Dict[str, str]
description: str

def __init__(
self,
name: str,
value_type: Union[ValueType, PrimitiveFeastType, ComplexFeastType],
default_sort_order: SortOrder.Enum.ValueType = SortOrder.ASC,
tags: Optional[Dict[str, str]] = None,
description: str = "",
):
self.name = name
if isinstance(value_type, ValueType):
self.value_type = value_type
elif isinstance(value_type, (PrimitiveFeastType, ComplexFeastType)):
self.value_type = value_type.to_value_type()
else:
raise ValueError(f"Unsupported value type: {value_type}")
self.default_sort_order = default_sort_order
self.tags = tags or {}
self.description = description

def ensure_valid(self):
"""
Validates that the SortKey has the required fields.
"""
if not self.name:
raise ValueError("SortKey must have a non-empty name.")
if not isinstance(self.value_type, ValueType):
raise ValueError("SortKey must have a valid value_type of type ValueType.")
if self.default_sort_order not in (SortOrder.ASC, SortOrder.DESC):
raise ValueError(
"SortKey default_sort_order must be either SortOrder.ASC or SortOrder.DESC."
)

def to_proto(self) -> SortKeyProto:
proto = SortKeyProto(
name=self.name,
value_type=self.value_type.value,
default_sort_order=self.default_sort_order,
description=self.description,
)
proto.tags.update(self.tags)
return proto

@classmethod
def from_proto(cls, proto: SortKeyProto) -> "SortKey":
return cls(
name=proto.name,
value_type=ValueType(proto.value_type),
default_sort_order=proto.default_sort_order,
tags=dict(proto.tags),
description=proto.description,
)
Loading