Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 7edbb0a

Browse files
committed
add lock for publisher and publish temp table creations
1 parent 0d0ad68 commit 7edbb0a

4 files changed

Lines changed: 68 additions & 19 deletions

File tree

bigframes/core/events.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import dataclasses
1818
import datetime
19+
import threading
1920
from typing import Any, Callable, Optional, Set
2021
import uuid
2122

@@ -30,7 +31,7 @@ class Subscriber:
3031
def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher):
3132
self._publisher = publisher
3233
self._callback = callback
33-
self._subscriber_id = str(uuid.uuid4())
34+
self._subscriber_id = uuid.uuid4()
3435

3536
def __call__(self, *args, **kwargs):
3637
return self._callback(*args, **kwargs)
@@ -65,21 +66,25 @@ def __exit__(self, exc_type, exc_value, traceback):
6566

6667
class Publisher:
6768
def __init__(self):
69+
self._subscribers_lock = threading.Lock()
6870
self._subscribers: Set[Subscriber] = set()
6971

7072
def subscribe(self, callback: Callable[[Event], None]) -> Subscriber:
7173
# TODO(b/448176657): figure out how to handle subscribers/publishers in
7274
# a background thread. Maybe subscribers should be thread-local?
7375
subscriber = Subscriber(callback, publisher=self)
74-
self._subscribers.add(subscriber)
76+
with self._subscribers_lock:
77+
self._subscribers.add(subscriber)
7578
return subscriber
7679

7780
def unsubscribe(self, subscriber: Subscriber):
78-
self._subscribers.remove(subscriber)
81+
with self._subscribers_lock:
82+
self._subscribers.remove(subscriber)
7983

8084
def publish(self, event: Event):
81-
for subscriber in self._subscribers:
82-
subscriber(event)
85+
with self._subscribers_lock:
86+
for subscriber in self._subscribers:
87+
subscriber(event)
8388

8489

8590
class Event:

bigframes/session/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ def __init__(
240240
bigquery_session.SessionResourceManager(
241241
self.bqclient,
242242
self._location,
243+
publisher=self._publisher,
243244
)
244245
if (self._bq_kms_key_name is None)
245246
else None

bigframes/session/bigquery_session.py

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from __future__ import annotations
16+
1517
import datetime
1618
import logging
1719
import threading
@@ -23,7 +25,9 @@
2325
import google.cloud.bigquery as bigquery
2426

2527
from bigframes.core.compile import googlesql
28+
import bigframes.core.events
2629
from bigframes.session import temporary_storage
30+
import bigframes.session._io.bigquery as bfbqio
2731

2832
KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0
2933

@@ -38,12 +42,19 @@ class SessionResourceManager(temporary_storage.TemporaryStorageManager):
3842
Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session.
3943
"""
4044

41-
def __init__(self, bqclient: bigquery.Client, location: str):
45+
def __init__(
46+
self,
47+
bqclient: bigquery.Client,
48+
location: str,
49+
*,
50+
publisher: bigframes.core.events.Publisher,
51+
):
4252
self.bqclient = bqclient
4353
self._location = location
4454
self._session_id: Optional[str] = None
4555
self._sessiondaemon: Optional[RecurringTaskDaemon] = None
4656
self._session_lock = threading.RLock()
57+
self._publisher = publisher
4758

4859
@property
4960
def location(self):
@@ -84,21 +95,38 @@ def create_temp_table(
8495

8596
ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}"
8697

87-
job = self.bqclient.query(
88-
ddl, job_config=job_config, location=self.location
98+
_, job = bfbqio.start_query_with_client(
99+
self.bqclient,
100+
ddl,
101+
job_config=job_config,
102+
location=self.location,
103+
project=None,
104+
timeout=None,
105+
metrics=None,
106+
query_with_job=True,
107+
publisher=self._publisher,
89108
)
90109
job.result()
91110
# return the fully qualified table, so it can be used outside of the session
92-
return job.destination
111+
destination = job.destination
112+
assert destination is not None, "Failure to create temp table."
113+
return destination
93114

94115
def close(self):
95116
if self._sessiondaemon is not None:
96117
self._sessiondaemon.stop()
97118

98119
if self._session_id is not None and self.bqclient is not None:
99-
self.bqclient.query_and_wait(
120+
bfbqio.start_query_with_client(
121+
self.bqclient,
100122
f"CALL BQ.ABORT_SESSION('{self._session_id}')",
123+
job_config=bigquery.QueryJobConfig(),
101124
location=self.location,
125+
project=None,
126+
timeout=None,
127+
metrics=None,
128+
query_with_job=False,
129+
publisher=self._publisher,
102130
)
103131

104132
def _get_session_id(self) -> str:
@@ -109,8 +137,16 @@ def _get_session_id(self) -> str:
109137
job_config = bigquery.QueryJobConfig(create_session=True)
110138
# Make sure the session is a new one, not one associated with another query.
111139
job_config.use_query_cache = False
112-
query_job = self.bqclient.query(
113-
"SELECT 1", job_config=job_config, location=self.location
140+
_, query_job = bfbqio.start_query_with_client(
141+
self.bqclient,
142+
"SELECT 1",
143+
job_config=job_config,
144+
location=self.location,
145+
project=None,
146+
timeout=None,
147+
metrics=None,
148+
query_with_job=True,
149+
publisher=self._publisher,
114150
)
115151
query_job.result() # blocks until finished
116152
assert query_job.session_info is not None
@@ -133,11 +169,16 @@ def _keep_session_alive(self):
133169
]
134170
)
135171
try:
136-
self.bqclient.query_and_wait(
172+
bfbqio.start_query_with_client(
173+
self.bqclient,
137174
"SELECT 1",
138-
location=self.location,
139175
job_config=job_config,
140-
wait_timeout=KEEPALIVE_QUERY_TIMEOUT_SECONDS,
176+
location=self.location,
177+
project=None,
178+
timeout=KEEPALIVE_QUERY_TIMEOUT_SECONDS,
179+
metrics=None,
180+
query_with_job=False,
181+
publisher=self._publisher,
141182
)
142183
except Exception as e:
143184
logging.warning("BigQuery session keep-alive query errored : %s", e)

tests/system/small/test_bq_sessions.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
import google
1919
import google.api_core.exceptions
20-
import google.cloud
2120
from google.cloud import bigquery
2221
import pytest
2322

23+
import bigframes.core.events
2424
from bigframes.session import bigquery_session
2525

2626
TEST_SCHEMA = [
@@ -39,12 +39,14 @@
3939
def session_resource_manager(
4040
bigquery_client,
4141
) -> bigquery_session.SessionResourceManager:
42-
return bigquery_session.SessionResourceManager(bigquery_client, "US")
42+
return bigquery_session.SessionResourceManager(
43+
bigquery_client, "US", publisher=bigframes.core.events.Publisher()
44+
)
4345

4446

4547
def test_bq_session_create_temp_table_clustered(bigquery_client: bigquery.Client):
4648
session_resource_manager = bigquery_session.SessionResourceManager(
47-
bigquery_client, "US"
49+
bigquery_client, "US", publisher=bigframes.core.events.Publisher()
4850
)
4951
cluster_cols = ["string field", "bool field"]
5052

@@ -68,7 +70,7 @@ def test_bq_session_create_temp_table_clustered(bigquery_client: bigquery.Client
6870

6971
def test_bq_session_create_multi_temp_tables(bigquery_client: bigquery.Client):
7072
session_resource_manager = bigquery_session.SessionResourceManager(
71-
bigquery_client, "US"
73+
bigquery_client, "US", publisher=bigframes.core.events.Publisher()
7274
)
7375

7476
def create_table():

0 commit comments

Comments
 (0)