Skip to content

Commit 721579a

Browse files
committed
use new unique jobs implementation
1 parent 54b4549 commit 721579a

File tree

16 files changed

+243
-1167
lines changed

16 files changed

+243
-1167
lines changed

Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ GEM
143143

144144
PLATFORMS
145145
arm64-darwin-22
146+
arm64-darwin-23
146147
x86_64-linux
147148

148149
DEPENDENCIES

docs/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ insert_res.job # inserted job row
4242

4343
Job args should:
4444

45-
* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
46-
* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.
45+
- Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
46+
- Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.
4747

4848
They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.
4949

@@ -89,7 +89,7 @@ insert_res.unique_skipped_as_duplicated
8989
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
9090

9191
```ruby
92-
client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)
92+
client = River::Client.new(mock_driver)
9393
```
9494

9595
Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.

driver/riverqueue-activerecord/Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ GEM
120120

121121
PLATFORMS
122122
arm64-darwin-22
123+
arm64-darwin-23
123124
x86_64-linux
124125

125126
DEPENDENCIES

driver/riverqueue-activerecord/lib/driver.rb

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,36 +31,14 @@ def errors = {}
3131
end
3232
end
3333

34-
def advisory_lock(key)
35-
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
36-
nil
37-
end
38-
39-
def advisory_lock_try(key)
40-
::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]
41-
end
42-
4334
def job_get_by_id(id)
4435
data_set = RiverJob.where(id: id)
4536
data_set.first ? to_job_row_from_model(data_set.first) : nil
4637
end
4738

48-
def job_get_by_kind_and_unique_properties(get_params)
49-
data_set = RiverJob.where(kind: get_params.kind)
50-
data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at
51-
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
52-
data_set = data_set.where(queue: get_params.queue) if get_params.queue
53-
data_set = data_set.where(state: get_params.state) if get_params.state
54-
data_set.first ? to_job_row_from_model(data_set.first) : nil
55-
end
56-
5739
def job_insert(insert_params)
58-
to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params)))
59-
end
60-
61-
def job_insert_unique(insert_params, unique_key)
6240
res = RiverJob.upsert(
63-
insert_params_to_hash(insert_params).merge(unique_key: unique_key),
41+
insert_params_to_hash(insert_params),
6442
on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
6543
returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),
6644

@@ -69,7 +47,7 @@ def job_insert_unique(insert_params, unique_key)
6947
# ActiveRecord tries to look up a unique index instead of letting
7048
# Postgres handle that, and of course it doesn't support a `WHERE`
7149
# clause. The workaround is to target the index name instead of columns.
72-
unique_by: "river_job_kind_unique_key_idx"
50+
unique_by: "river_job_unique_idx"
7351
)
7452

7553
[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
@@ -104,7 +82,9 @@ def transaction(&)
10482
queue: insert_params.queue,
10583
state: insert_params.state,
10684
scheduled_at: insert_params.scheduled_at,
107-
tags: insert_params.tags
85+
tags: insert_params.tags,
86+
unique_key: insert_params.unique_key,
87+
unique_states: insert_params.unique_states
10888
}.compact
10989
end
11090

@@ -139,7 +119,8 @@ def transaction(&)
139119
scheduled_at: river_job.scheduled_at.getutc,
140120
state: river_job.state,
141121
tags: river_job.tags,
142-
unique_key: river_job.unique_key
122+
unique_key: river_job.unique_key,
123+
unique_states: river_job.unique_states
143124
)
144125
end
145126

@@ -182,7 +163,8 @@ def transaction(&)
182163
scheduled_at: river_job["scheduled_at"].getutc,
183164
state: river_job["state"],
184165
tags: river_job["tags"],
185-
unique_key: river_job["unique_key"]
166+
unique_key: river_job["unique_key"],
167+
unique_states: river_job["unique_states"]
186168
)
187169
end
188170
end

driver/riverqueue-sequel/Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ GEM
7878

7979
PLATFORMS
8080
arm64-darwin-22
81+
arm64-darwin-23
8182
x86_64-linux
8283

8384
DEPENDENCIES

driver/riverqueue-sequel/lib/driver.rb

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,51 +13,34 @@ def initialize(db)
1313
@db.extension(:pg_json)
1414
end
1515

16-
def advisory_lock(key)
17-
@db.fetch("SELECT pg_advisory_xact_lock(?)", key).first
18-
nil
19-
end
20-
21-
def advisory_lock_try(key)
22-
@db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock]
23-
end
24-
2516
def job_get_by_id(id)
2617
data_set = @db[:river_job].where(id: id)
2718
data_set.first ? to_job_row(data_set.first) : nil
2819
end
2920

30-
def job_get_by_kind_and_unique_properties(get_params)
31-
data_set = @db[:river_job].where(kind: get_params.kind)
32-
data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at
33-
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
34-
data_set = data_set.where(queue: get_params.queue) if get_params.queue
35-
data_set = data_set.where(state: get_params.state) if get_params.state
36-
data_set.first ? to_job_row(data_set.first) : nil
37-
end
38-
3921
def job_insert(insert_params)
40-
to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params)))
22+
results = insert_jobs([insert_params])
23+
[to_job_row(results.first), results.first[:unique_skipped_as_duplicate]]
24+
end
25+
26+
def job_insert_many(insert_params_many)
27+
results = insert_jobs(insert_params_many)
28+
# Count the number of jobs actually inserted (not skipped)
29+
inserted_count = results.count { |row| !row[:unique_skipped_as_duplicate] }
30+
inserted_count
4131
end
4232

43-
def job_insert_unique(insert_params, unique_key)
44-
values = @db[:river_job]
33+
private def insert_jobs(insert_params_array)
34+
@db[:river_job]
4535
.insert_conflict(
46-
target: [:kind, :unique_key],
47-
conflict_where: ::Sequel.lit("unique_key IS NOT NULL"),
36+
target: [:unique_key],
37+
conflict_where: ::Sequel.lit(
38+
"unique_key IS NOT NULL AND unique_states IS NOT NULL AND river_job_state_in_bitmask(unique_states, state)"
39+
),
4840
update: {kind: ::Sequel[:excluded][:kind]}
4941
)
5042
.returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate"))
51-
.insert_select(
52-
insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key))
53-
)
54-
55-
[to_job_row(values), values[:unique_skipped_as_duplicate]]
56-
end
57-
58-
def job_insert_many(insert_params_many)
59-
@db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) })
60-
insert_params_many.count
43+
.multi_insert(insert_params_array.map { |p| insert_params_to_hash(p) })
6144
end
6245

6346
def job_list
@@ -76,6 +59,7 @@ def transaction(&)
7659
private def insert_params_to_hash(insert_params)
7760
# the call to `#compact` is important so that we remove nils and table
7861
# default values get picked up instead
62+
# TODO: but I had to remove it for bulk unique inserts...
7963
{
8064
args: insert_params.encoded_args,
8165
kind: insert_params.kind,
@@ -84,8 +68,10 @@ def transaction(&)
8468
queue: insert_params.queue,
8569
state: insert_params.state,
8670
scheduled_at: insert_params.scheduled_at,
87-
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil
88-
}.compact
71+
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags, :text) : nil,
72+
unique_key: insert_params.unique_key ? ::Sequel.blob(insert_params.unique_key) : nil,
73+
unique_states: insert_params.unique_states
74+
}
8975
end
9076

9177
private def to_job_row(river_job)
@@ -113,7 +99,8 @@ def transaction(&)
11399
scheduled_at: river_job[:scheduled_at].getutc,
114100
state: river_job[:state],
115101
tags: river_job[:tags].to_a,
116-
unique_key: river_job[:unique_key]&.to_s
102+
unique_key: river_job[:unique_key]&.to_s,
103+
unique_states: river_job[:unique_states]&.to_i(2)
117104
)
118105
end
119106
end

0 commit comments

Comments
 (0)