Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3d1f16a
fix: Update Docker images to use official sources for integration tes…
EXPEbdodla Jun 23, 2025
587a54e
fix: Only add sort key filter models to list if they meet the conditi…
piket Jun 23, 2025
f335e47
fix: Adding Snake Case to Include Metadata Check (#264)
Manisha4 Jun 26, 2025
5d52176
fix: Pack and unpack repeated list values into and out of arrow array…
piket Jun 26, 2025
e917042
feat: Adding Changes to Check if FV and SFV have Valid Updates (#254)
Manisha4 Jun 26, 2025
06b64f3
fix: Validate requested features exist in view. (#269)
piket Jun 27, 2025
c13e99d
feat: Separate entities from features in Range Query response (#265)
omirandadev Jun 30, 2025
0e8a853
fix: Keep duplicate requested features in response for range query. (…
piket Jun 30, 2025
f427234
fix: Http range timestamp values should return in rfc3339 format (#277)
piket Jul 3, 2025
7e7deee
fix: Exception Handling in Updates Feature (#278)
Manisha4 Jul 3, 2025
646fd3e
fix: Timestamps should be seconds percision. Return null status for f…
piket Jul 7, 2025
800091b
fix: Go server errors should be status errors with proper status code…
piket Jul 10, 2025
005f437
fix: Validation order for multiple missing end keys should pass (#283)
piket Jul 14, 2025
13b0cf3
fix: Handle null values in Arrow list conversion to Proto values (#280)
EXPEbdodla Jul 14, 2025
277731f
fix: Improve Arrow to proto conversion to handle null and empty array…
EXPEbdodla Jul 15, 2025
39aa252
feat: Add versioning information to the server and expose it via a ne…
EXPEbdodla Jul 18, 2025
ede1cc2
fix: Add http int tests and fix http error codes (#287)
piket Jul 18, 2025
9a9fec4
feat: Add search RPCs and sql.py functions
Jul 22, 2025
7ac9a31
fix: linting
Jul 22, 2025
4ff69ed
fix: default values if left empty in request in registry_server.py
Jul 22, 2025
19c61f6
fix: formatting
Jul 22, 2025
e03e5f9
chore: pass requests as python classes from protos
Jul 22, 2025
568ce22
feat: add tests for expediagroup/search.py objects
Jul 23, 2025
b825edc
fix: formatting
Jul 23, 2025
6b2e4ff
add log to note the grpc server has started
Jul 23, 2025
ab78a45
set log level to info
Jul 23, 2025
45f8434
fix: lint error
Jul 23, 2025
01cc153
feat: add option for SqlFallbackRegistry in feature_store.py and prev…
Aug 13, 2025
c28013b
fix: add sql-fallback to REGISTRY_CLASS_FOR_TYPE
Aug 13, 2025
fb5d634
sequential search projects
Aug 21, 2025
39467fb
use ProcessPoolExecutor
Aug 21, 2025
45aed78
fix: add back ThreadPoolExecutor import
Aug 21, 2025
ab8d197
sequential search projects again
Aug 21, 2025
934c312
use threadpoolexecutor again to see if that fixes serialization
Aug 22, 2025
63028ea
add project field to feature views
Aug 22, 2025
0a3c6c0
undo that feature view change
Aug 22, 2025
64ea0e8
fix: add project name to fv spec
Aug 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,29 @@ func (fs *FeatureStore) GetOnlineFeatures(
fullFeatureNames bool) ([]*onlineserving.FeatureVector, error) {
var err error
var requestedFeatureViews []*onlineserving.FeatureViewAndRefs
var requestedSortedFeatureViews []*onlineserving.SortedFeatureViewAndRefs
var requestedOnDemandFeatureViews []*model.OnDemandFeatureView

if featureService != nil {
requestedFeatureViews, requestedOnDemandFeatureViews, err =
requestedFeatureViews, requestedSortedFeatureViews, requestedOnDemandFeatureViews, err =
onlineserving.GetFeatureViewsToUseByService(featureService, fs.registry, fs.config.Project)
if err != nil {
return nil, err
}
} else {
requestedFeatureViews, requestedOnDemandFeatureViews, err =
requestedFeatureViews, requestedSortedFeatureViews, requestedOnDemandFeatureViews, err =
onlineserving.GetFeatureViewsToUseByFeatureRefs(featureRefs, fs.registry, fs.config.Project)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}

if len(requestedSortedFeatureViews) > 0 {
sfvNames := make([]string, len(requestedSortedFeatureViews))
for i, sfv := range requestedSortedFeatureViews {
sfvNames[i] = sfv.View.Base.Name
}
return nil, errors.GrpcInvalidArgumentErrorf("GetOnlineFeatures does not support sorted feature views %v", sfvNames)
}

if len(requestedFeatureViews) == 0 {
Expand Down Expand Up @@ -319,20 +328,24 @@ func (fs *FeatureStore) GetOnlineFeaturesRange(

var err error
var requestedSortedFeatureViews []*onlineserving.SortedFeatureViewAndRefs

var requestedFeatureViews []*onlineserving.FeatureViewAndRefs
if featureService != nil {
requestedSortedFeatureViews, err =
onlineserving.GetSortedFeatureViewsToUseByService(featureService, fs.registry, fs.config.Project)
if err != nil {
return nil, err
}

requestedFeatureViews, requestedSortedFeatureViews, _, err =
onlineserving.GetFeatureViewsToUseByService(featureService, fs.registry, fs.config.Project)
} else {
requestedSortedFeatureViews, err = onlineserving.GetSortedFeatureViewsToUseByFeatureRefs(
featureRefs, fs.registry, fs.config.Project)
if err != nil {
return nil, err
requestedFeatureViews, requestedSortedFeatureViews, _, err =
onlineserving.GetFeatureViewsToUseByFeatureRefs(featureRefs, fs.registry, fs.config.Project)
}
if err != nil {
return nil, err
}

if len(requestedFeatureViews) > 0 {
fvNames := make([]string, len(requestedFeatureViews))
for i, fv := range requestedFeatureViews {
fvNames[i] = fv.View.Base.Name
}
return nil, errors.GrpcInvalidArgumentErrorf("GetOnlineFeaturesRange does not support standard feature views %v", fvNames)
}

if len(requestedSortedFeatureViews) == 0 {
Expand Down Expand Up @@ -469,6 +482,12 @@ func (fs *FeatureStore) ParseFeatures(kind interface{}) (*Features, error) {
return nil, err
}
return &Features{FeaturesRefs: nil, FeatureService: featureService}, nil
featureServiceRequest := kind.(*serving.GetOnlineFeaturesRangeRequest_FeatureService)
featureService, err := fs.registry.GetFeatureService(fs.config.Project, featureServiceRequest.FeatureService)
if err != nil {
return nil, err
}
return &Features{FeaturesRefs: nil, FeatureService: featureService}, nil
default:
return nil, errors.GrpcInvalidArgumentErrorf("cannot parse 'kind' of either a Feature Service or list of Features from request")
}
Expand Down
8 changes: 4 additions & 4 deletions go/internal/feast/featurestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestGetOnlineFeaturesRange(t *testing.T) {
FeatureName: "conv_rate",
Values: []interface{}{0.85, 0.87, 0.89},
Statuses: []serving.FieldStatus{serving.FieldStatus_PRESENT, serving.FieldStatus_PRESENT, serving.FieldStatus_PRESENT},
EventTimestamps: []timestamppb.Timestamp{
EventTimestamps: []timestamp.Timestamp{
{Seconds: now.Unix() - 86400*3},
{Seconds: now.Unix() - 86400*2},
{Seconds: now.Unix() - 86400*1},
Expand All @@ -195,7 +195,7 @@ func TestGetOnlineFeaturesRange(t *testing.T) {
FeatureName: "acc_rate",
Values: []interface{}{0.91, 0.92, 0.94},
Statuses: []serving.FieldStatus{serving.FieldStatus_PRESENT, serving.FieldStatus_PRESENT, serving.FieldStatus_PRESENT},
EventTimestamps: []timestamppb.Timestamp{
EventTimestamps: []timestamp.Timestamp{
{Seconds: now.Unix() - 86400*3},
{Seconds: now.Unix() - 86400*2},
{Seconds: now.Unix() - 86400*1},
Expand All @@ -208,7 +208,7 @@ func TestGetOnlineFeaturesRange(t *testing.T) {
FeatureName: "conv_rate",
Values: []interface{}{0.78, 0.80},
Statuses: []serving.FieldStatus{serving.FieldStatus_PRESENT, serving.FieldStatus_PRESENT},
EventTimestamps: []timestamppb.Timestamp{
EventTimestamps: []timestamp.Timestamp{
{Seconds: now.Unix() - 86400*3},
{Seconds: now.Unix() - 86400*1},
},
Expand All @@ -218,7 +218,7 @@ func TestGetOnlineFeaturesRange(t *testing.T) {
FeatureName: "acc_rate",
Values: []interface{}{0.85, 0.88},
Statuses: []serving.FieldStatus{serving.FieldStatus_PRESENT, serving.FieldStatus_PRESENT},
EventTimestamps: []timestamppb.Timestamp{
EventTimestamps: []timestamp.Timestamp{
{Seconds: now.Unix() - 86400*3},
{Seconds: now.Unix() - 86400*1},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,5 @@

mlpfs_test_all_datatypes_service = FeatureService(
name="test_service",
features=[mlpfs_test_all_datatypes_view],
)

mlpfs_test_all_datatypes_sorted_service = FeatureService(
name="test_sorted_service",
features=[mlpfs_test_all_datatypes_sorted_view],
features=[mlpfs_test_all_datatypes_view, mlpfs_test_all_datatypes_sorted_view],
)
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,6 @@ func TestGetOnlineFeaturesRange_Http(t *testing.T) {
"all_dtypes_sorted:string_val",
"all_dtypes_sorted:timestamp_val",
"all_dtypes_sorted:boolean_val",
"all_dtypes_sorted:array_int_val",
"all_dtypes_sorted:array_long_val",
"all_dtypes_sorted:array_float_val",
"all_dtypes_sorted:array_double_val",
"all_dtypes_sorted:array_byte_val",
"all_dtypes_sorted:array_string_val",
"all_dtypes_sorted:array_timestamp_val",
"all_dtypes_sorted:array_boolean_val",
"all_dtypes_sorted:null_int_val",
"all_dtypes_sorted:null_long_val",
"all_dtypes_sorted:null_float_val",
Expand All @@ -89,8 +81,16 @@ func TestGetOnlineFeaturesRange_Http(t *testing.T) {
"all_dtypes_sorted:null_array_double_val",
"all_dtypes_sorted:null_array_byte_val",
"all_dtypes_sorted:null_array_string_val",
"all_dtypes_sorted:null_array_timestamp_val",
"all_dtypes_sorted:null_array_boolean_val",
"all_dtypes_sorted:array_int_val",
"all_dtypes_sorted:array_long_val",
"all_dtypes_sorted:array_float_val",
"all_dtypes_sorted:array_double_val",
"all_dtypes_sorted:array_string_val",
"all_dtypes_sorted:array_boolean_val",
"all_dtypes_sorted:array_byte_val",
"all_dtypes_sorted:array_timestamp_val",
"all_dtypes_sorted:null_array_timestamp_val",
"all_dtypes_sorted:event_timestamp"
],
"entities": {
Expand Down Expand Up @@ -128,14 +128,6 @@ func TestGetOnlineFeaturesRange_Http_withOnlyEqualsFilter(t *testing.T) {
"all_dtypes_sorted:string_val",
"all_dtypes_sorted:timestamp_val",
"all_dtypes_sorted:boolean_val",
"all_dtypes_sorted:array_int_val",
"all_dtypes_sorted:array_long_val",
"all_dtypes_sorted:array_float_val",
"all_dtypes_sorted:array_double_val",
"all_dtypes_sorted:array_byte_val",
"all_dtypes_sorted:array_string_val",
"all_dtypes_sorted:array_timestamp_val",
"all_dtypes_sorted:array_boolean_val",
"all_dtypes_sorted:null_int_val",
"all_dtypes_sorted:null_long_val",
"all_dtypes_sorted:null_float_val",
Expand All @@ -150,8 +142,16 @@ func TestGetOnlineFeaturesRange_Http_withOnlyEqualsFilter(t *testing.T) {
"all_dtypes_sorted:null_array_double_val",
"all_dtypes_sorted:null_array_byte_val",
"all_dtypes_sorted:null_array_string_val",
"all_dtypes_sorted:null_array_timestamp_val",
"all_dtypes_sorted:null_array_boolean_val",
"all_dtypes_sorted:array_int_val",
"all_dtypes_sorted:array_long_val",
"all_dtypes_sorted:array_float_val",
"all_dtypes_sorted:array_double_val",
"all_dtypes_sorted:array_string_val",
"all_dtypes_sorted:array_boolean_val",
"all_dtypes_sorted:array_byte_val",
"all_dtypes_sorted:array_timestamp_val",
"all_dtypes_sorted:null_array_timestamp_val",
"all_dtypes_sorted:event_timestamp"
],
"entities": {
Expand Down Expand Up @@ -187,14 +187,6 @@ func TestGetOnlineFeaturesRange_Http_forNonExistentEntityKey(t *testing.T) {
"all_dtypes_sorted:string_val",
"all_dtypes_sorted:timestamp_val",
"all_dtypes_sorted:boolean_val",
"all_dtypes_sorted:array_int_val",
"all_dtypes_sorted:array_long_val",
"all_dtypes_sorted:array_float_val",
"all_dtypes_sorted:array_double_val",
"all_dtypes_sorted:array_byte_val",
"all_dtypes_sorted:array_string_val",
"all_dtypes_sorted:array_timestamp_val",
"all_dtypes_sorted:array_boolean_val",
"all_dtypes_sorted:null_int_val",
"all_dtypes_sorted:null_long_val",
"all_dtypes_sorted:null_float_val",
Expand All @@ -209,8 +201,16 @@ func TestGetOnlineFeaturesRange_Http_forNonExistentEntityKey(t *testing.T) {
"all_dtypes_sorted:null_array_double_val",
"all_dtypes_sorted:null_array_byte_val",
"all_dtypes_sorted:null_array_string_val",
"all_dtypes_sorted:null_array_timestamp_val",
"all_dtypes_sorted:null_array_boolean_val",
"all_dtypes_sorted:array_int_val",
"all_dtypes_sorted:array_long_val",
"all_dtypes_sorted:array_float_val",
"all_dtypes_sorted:array_double_val",
"all_dtypes_sorted:array_string_val",
"all_dtypes_sorted:array_boolean_val",
"all_dtypes_sorted:array_byte_val",
"all_dtypes_sorted:array_timestamp_val",
"all_dtypes_sorted:null_array_timestamp_val",
"all_dtypes_sorted:event_timestamp"
],
"entities": {
Expand Down Expand Up @@ -278,14 +278,6 @@ func TestGetOnlineFeaturesRange_Http_withEmptySortKeyFilter(t *testing.T) {
"all_dtypes_sorted:string_val",
"all_dtypes_sorted:timestamp_val",
"all_dtypes_sorted:boolean_val",
"all_dtypes_sorted:array_int_val",
"all_dtypes_sorted:array_long_val",
"all_dtypes_sorted:array_float_val",
"all_dtypes_sorted:array_double_val",
"all_dtypes_sorted:array_byte_val",
"all_dtypes_sorted:array_string_val",
"all_dtypes_sorted:array_timestamp_val",
"all_dtypes_sorted:array_boolean_val",
"all_dtypes_sorted:null_int_val",
"all_dtypes_sorted:null_long_val",
"all_dtypes_sorted:null_float_val",
Expand All @@ -300,8 +292,16 @@ func TestGetOnlineFeaturesRange_Http_withEmptySortKeyFilter(t *testing.T) {
"all_dtypes_sorted:null_array_double_val",
"all_dtypes_sorted:null_array_byte_val",
"all_dtypes_sorted:null_array_string_val",
"all_dtypes_sorted:null_array_timestamp_val",
"all_dtypes_sorted:null_array_boolean_val",
"all_dtypes_sorted:array_int_val",
"all_dtypes_sorted:array_long_val",
"all_dtypes_sorted:array_float_val",
"all_dtypes_sorted:array_double_val",
"all_dtypes_sorted:array_string_val",
"all_dtypes_sorted:array_boolean_val",
"all_dtypes_sorted:array_byte_val",
"all_dtypes_sorted:array_timestamp_val",
"all_dtypes_sorted:null_array_timestamp_val",
"all_dtypes_sorted:event_timestamp"
],
"entities": {
Expand All @@ -323,7 +323,7 @@ func TestGetOnlineFeaturesRange_Http_withEmptySortKeyFilter(t *testing.T) {

func TestGetOnlineFeaturesRange_Http_withFeatureService(t *testing.T) {
requestJson := []byte(`{
"feature_service": "test_sorted_service",
"feature_service": "test_service",
"entities": {
"index_id": [1, 2, 3]
},
Expand All @@ -342,63 +342,36 @@ func TestGetOnlineFeaturesRange_Http_withFeatureService(t *testing.T) {
responseRecorder := httptest.NewRecorder()

getOnlineFeaturesRangeHandler.ServeHTTP(responseRecorder, request)
assert.Equal(t, responseRecorder.Code, http.StatusOK, "Expected HTTP status code 200 OK response body is: %s", responseRecorder.Body.String())
expectedResponse, err := loadResponse("valid_response.json")
require.NoError(t, err, "Failed to load expected response from file")
assert.JSONEq(t, string(expectedResponse), responseRecorder.Body.String(), "Response body does not match expected JSON")
}

func TestGetOnlineFeaturesRange_Http_withInvalidFeatureService(t *testing.T) {
requestJson := []byte(`{
"feature_service": "invalid_service",
"entities": {
"index_id": [1, 2, 3]
},
"sort_key_filters": [
{
"sort_key_name": "event_timestamp",
"range": {
"range_start": 0
}
}
],
"limit": 10
}`)

request := httptest.NewRequest(http.MethodPost, "/get-online-features-range", bytes.NewBuffer(requestJson))
responseRecorder := httptest.NewRecorder()

getOnlineFeaturesRangeHandler.ServeHTTP(responseRecorder, request)
assert.Equal(t, http.StatusNotFound, responseRecorder.Code)
assert.Contains(t, responseRecorder.Body.String(), "Error getting feature service from registry", "Response body does not contain expected error message")
assert.Equal(t, responseRecorder.Code, http.StatusBadRequest)
assert.Equal(t, `{"error":"GetOnlineFeaturesRange does not support standard feature views [all_dtypes]","status_code":400}`, responseRecorder.Body.String(), "Response body does not match expected error message")
}

func TestGetOnlineFeaturesRange_Http_withInvalidSortedFeatureView(t *testing.T) {
func TestGetOnlineFeaturesRange_Http_withInvalidFeatureView(t *testing.T) {
requestJson := []byte(`{
"features": ["invalid_sorted_view:some_feature"],
"entities": {
"index_id": [1, 2, 3]
},
"sort_key_filters": [
{
"sort_key_name": "event_timestamp",
"range": {
"range_start": {
"unix_timestamp_val": 0
}
}
}
],
"limit": 10
}`)
"features": [
"all_dtypes:int_val"
],
"entities": {
"index_id": [1, 2, 3]
},
"sort_key_filters": [
{
"sort_key_name": "event_timestamp",
"range": {
"range_start": 0
}
}
],
"limit": 10
}`)

request := httptest.NewRequest(http.MethodPost, "/get-online-features-range", bytes.NewBuffer(requestJson))
responseRecorder := httptest.NewRecorder()

getOnlineFeaturesRangeHandler.ServeHTTP(responseRecorder, request)
assert.Equal(t, http.StatusBadRequest, responseRecorder.Code)
expectedErrorMessage := `{"error":"sorted feature view invalid_sorted_view doesn't exist, please make sure that you have created the sorted feature view invalid_sorted_view and that you have registered it by running \"apply\"","status_code":400}`
assert.JSONEq(t, expectedErrorMessage, responseRecorder.Body.String(), "Response body does not match expected error message")
assert.Equal(t, responseRecorder.Code, http.StatusBadRequest, "Expected HTTP status code 400 BadRequest response body is: %s", responseRecorder.Body.String())
expectedErrorMessage := `{"error":"GetOnlineFeaturesRange does not support standard feature views [all_dtypes]","status_code":400}`
assert.Equal(t, expectedErrorMessage, responseRecorder.Body.String(), "Response body does not match expected error message")
}

func TestGetOnlineFeaturesRange_Http_withInvalidSortKeyFilter(t *testing.T) {
Expand Down
Loading
Loading