Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,9 @@ def list_projects(
return self._list_projects(tags)

def refresh(self, project: Optional[str] = None):
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = _utc_now()
with self._refresh_lock:
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = _utc_now()

def _refresh_cached_registry_if_necessary(self):
if self.cache_mode == "sync":
Expand Down
31 changes: 7 additions & 24 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import atexit
import logging
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
Expand Down Expand Up @@ -287,11 +285,6 @@ def __init__(
self.thread_pool_executor_worker_count = (
registry_config.thread_pool_executor_worker_count
)
if self.thread_pool_executor_worker_count > 0:
self._executor = ThreadPoolExecutor(
max_workers=self.thread_pool_executor_worker_count
)
atexit.register(self._exit_handler)
self.purge_feast_metadata = registry_config.purge_feast_metadata
# Sync feast_metadata to projects table
# when purge_feast_metadata is set to True, Delete data from
Expand Down Expand Up @@ -990,17 +983,14 @@ def process_project(project: Project):
r.infra.CopyFrom(self.get_infra(project_name).to_proto())

projects_list = self.list_projects(allow_cache=False)
if self._executor:
logger.info(
f"Thread count before executor.map: {len(threading.enumerate())}"
)
self._executor.map(process_project, projects_list)
logger.info(
f"Thread count after executor.map: {len(threading.enumerate())}"
)
if self.thread_pool_executor_worker_count == 0:
for project in projects_list:
process_project(project)
else:
for p in projects_list:
process_project(p)
with ThreadPoolExecutor(
max_workers=self.thread_pool_executor_worker_count
) as executor:
executor.map(process_project, projects_list)

if last_updated_timestamps:
r.last_updated.FromDatetime(max(last_updated_timestamps))
Expand Down Expand Up @@ -1427,10 +1417,3 @@ def get_project_metadata(
datetime.utcfromtimestamp(int(metadata_value))
)
return project_metadata_model

def _exit_handler(self):
if self._executor:
logger.info("Shutting down SqlRegistry's ThreadPoolExecutor...")
self._executor.shutdown(wait=False, cancel_futures=True)
logger.info("ThreadPoolExecutor shut down successfully.")
self._executor = None
Loading