Skip to content

Commit 561cf00

Browse files
committed
use new unique jobs implementation
1 parent 54b4549 commit 561cf00

25 files changed

+398
-1252
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
.DS_Store
12
*.gem
23
coverage/

Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ GEM
143143

144144
PLATFORMS
145145
arm64-darwin-22
146+
arm64-darwin-23
146147
x86_64-linux
147148

148149
DEPENDENCIES

docs/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ insert_res.job # inserted job row
4242

4343
Job args should:
4444

45-
* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
46-
* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.
45+
- Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
46+
- Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.
4747

4848
They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.
4949

@@ -89,7 +89,7 @@ insert_res.unique_skipped_as_duplicated
8989
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
9090

9191
```ruby
92-
client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)
92+
client = River::Client.new(mock_driver)
9393
```
9494

9595
Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.

driver/riverqueue-activerecord/Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ GEM
120120

121121
PLATFORMS
122122
arm64-darwin-22
123+
arm64-darwin-23
123124
x86_64-linux
124125

125126
DEPENDENCIES

driver/riverqueue-activerecord/lib/driver.rb

Lines changed: 53 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -31,36 +31,18 @@ def errors = {}
3131
end
3232
end
3333

34-
def advisory_lock(key)
35-
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
36-
nil
37-
end
38-
39-
def advisory_lock_try(key)
40-
::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]
41-
end
42-
4334
def job_get_by_id(id)
4435
data_set = RiverJob.where(id: id)
4536
data_set.first ? to_job_row_from_model(data_set.first) : nil
4637
end
4738

48-
def job_get_by_kind_and_unique_properties(get_params)
49-
data_set = RiverJob.where(kind: get_params.kind)
50-
data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at
51-
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
52-
data_set = data_set.where(queue: get_params.queue) if get_params.queue
53-
data_set = data_set.where(state: get_params.state) if get_params.state
54-
data_set.first ? to_job_row_from_model(data_set.first) : nil
55-
end
56-
5739
def job_insert(insert_params)
58-
to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params)))
40+
job_insert_many([insert_params]).first
5941
end
6042

61-
def job_insert_unique(insert_params, unique_key)
62-
res = RiverJob.upsert(
63-
insert_params_to_hash(insert_params).merge(unique_key: unique_key),
43+
def job_insert_many(insert_params_many)
44+
res = RiverJob.upsert_all(
45+
insert_params_many.map { |param| insert_params_to_hash(param) },
6446
on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
6547
returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),
6648

@@ -69,15 +51,9 @@ def job_insert_unique(insert_params, unique_key)
6951
# ActiveRecord tries to look up a unique index instead of letting
7052
# Postgres handle that, and of course it doesn't support a `WHERE`
7153
# clause. The workaround is to target the index name instead of columns.
72-
unique_by: "river_job_kind_unique_key_idx"
54+
unique_by: "river_job_unique_idx"
7355
)
74-
75-
[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
76-
end
77-
78-
def job_insert_many(insert_params_many)
79-
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
80-
insert_params_many.count
56+
to_insert_results(res)
8157
end
8258

8359
def job_list
@@ -94,8 +70,6 @@ def transaction(&)
9470
end
9571

9672
private def insert_params_to_hash(insert_params)
97-
# the call to `#compact` is important so that we remove nils and table
98-
# default values get picked up instead
9973
{
10074
args: insert_params.encoded_args,
10175
kind: insert_params.kind,
@@ -104,8 +78,10 @@ def transaction(&)
10478
queue: insert_params.queue,
10579
state: insert_params.state,
10680
scheduled_at: insert_params.scheduled_at,
107-
tags: insert_params.tags
108-
}.compact
81+
tags: insert_params.tags || [],
82+
unique_key: insert_params.unique_key,
83+
unique_states: insert_params.unique_states
84+
}
10985
end
11086

11187
private def to_job_row_from_model(river_job)
@@ -139,51 +115,62 @@ def transaction(&)
139115
scheduled_at: river_job.scheduled_at.getutc,
140116
state: river_job.state,
141117
tags: river_job.tags,
142-
unique_key: river_job.unique_key
118+
unique_key: river_job.unique_key,
119+
unique_states: river_job.unique_states
143120
)
144121
end
145122

123+
private def to_insert_results(res)
124+
res.rows.map do |row|
125+
to_job_row_from_raw(row, res.columns, res.column_types)
126+
end
127+
end
128+
146129
# This is really awful, but some of ActiveRecord's methods (e.g. `.create`)
147130
# return a model, and others (e.g. `.upsert`) return raw values, and
148131
# therefore this second version from unmarshaling a job row exists. I
149132
# searched long and hard for a way to have the former type of method return
150133
# raw or the latter type of method return a model, but was unable to find
151134
# anything.
152-
private def to_job_row_from_raw(res)
135+
private def to_job_row_from_raw(row, columns, column_types)
153136
river_job = {}
154137

155-
res.rows[0].each_with_index do |val, i|
156-
river_job[res.columns[i]] = res.column_types[i].deserialize(val)
138+
row.each_with_index do |val, i|
139+
river_job[columns[i]] = column_types[i].deserialize(val)
157140
end
158141

159-
River::JobRow.new(
160-
id: river_job["id"],
161-
args: JSON.parse(river_job["args"]),
162-
attempt: river_job["attempt"],
163-
attempted_at: river_job["attempted_at"]&.getutc,
164-
attempted_by: river_job["attempted_by"],
165-
created_at: river_job["created_at"].getutc,
166-
errors: river_job["errors"]&.map { |e|
167-
deserialized_error = JSON.parse(e)
168-
169-
River::AttemptError.new(
170-
at: Time.parse(deserialized_error["at"]),
171-
attempt: deserialized_error["attempt"],
172-
error: deserialized_error["error"],
173-
trace: deserialized_error["trace"]
174-
)
175-
},
176-
finalized_at: river_job["finalized_at"]&.getutc,
177-
kind: river_job["kind"],
178-
max_attempts: river_job["max_attempts"],
179-
metadata: river_job["metadata"],
180-
priority: river_job["priority"],
181-
queue: river_job["queue"],
182-
scheduled_at: river_job["scheduled_at"].getutc,
183-
state: river_job["state"],
184-
tags: river_job["tags"],
185-
unique_key: river_job["unique_key"]
186-
)
142+
[
143+
River::JobRow.new(
144+
id: river_job["id"],
145+
args: JSON.parse(river_job["args"]),
146+
attempt: river_job["attempt"],
147+
attempted_at: river_job["attempted_at"]&.getutc,
148+
attempted_by: river_job["attempted_by"],
149+
created_at: river_job["created_at"].getutc,
150+
errors: river_job["errors"]&.map { |e|
151+
deserialized_error = JSON.parse(e)
152+
153+
River::AttemptError.new(
154+
at: Time.parse(deserialized_error["at"]),
155+
attempt: deserialized_error["attempt"],
156+
error: deserialized_error["error"],
157+
trace: deserialized_error["trace"]
158+
)
159+
},
160+
finalized_at: river_job["finalized_at"]&.getutc,
161+
kind: river_job["kind"],
162+
max_attempts: river_job["max_attempts"],
163+
metadata: river_job["metadata"],
164+
priority: river_job["priority"],
165+
queue: river_job["queue"],
166+
scheduled_at: river_job["scheduled_at"].getutc,
167+
state: river_job["state"],
168+
tags: river_job["tags"],
169+
unique_key: river_job["unique_key"],
170+
unique_states: ::River::UniqueBitmask.to_states(river_job["unique_states"]&.to_i(2))
171+
),
172+
river_job["unique_skipped_as_duplicate"]
173+
]
187174
end
188175
end
189176
end

driver/riverqueue-activerecord/spec/driver_spec.rb

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,14 @@
122122

123123
describe "#to_job_row_from_raw" do
124124
it "converts a database record to `River::JobRow` with minimal properties" do
125-
river_job = River::Driver::ActiveRecord::RiverJob.insert({
125+
res = River::Driver::ActiveRecord::RiverJob.insert({
126126
id: 1,
127127
args: %({"job_num":1}),
128128
kind: "simple",
129129
max_attempts: River::MAX_ATTEMPTS_DEFAULT
130-
}, returning: Arel.sql("*"))
130+
}, returning: Arel.sql("*, false AS unique_skipped_as_duplicate"))
131131

132-
job_row = driver.send(:to_job_row_from_raw, river_job)
132+
job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types)
133133

134134
expect(job_row).to be_an_instance_of(River::JobRow)
135135
expect(job_row).to have_attributes(
@@ -148,11 +148,12 @@
148148
state: River::JOB_STATE_AVAILABLE,
149149
tags: []
150150
)
151+
expect(skipped_as_duplicate).to be(false)
151152
end
152153

153154
it "converts a database record to `River::JobRow` with all properties" do
154155
now = Time.now
155-
river_job = River::Driver::ActiveRecord::RiverJob.insert({
156+
res = River::Driver::ActiveRecord::RiverJob.insert({
156157
id: 1,
157158
attempt: 1,
158159
attempted_at: now,
@@ -168,9 +169,9 @@
168169
state: River::JOB_STATE_COMPLETED,
169170
tags: ["tag1"],
170171
unique_key: Digest::SHA256.digest("unique_key_str")
171-
}, returning: Arel.sql("*"))
172+
}, returning: Arel.sql("*, true AS unique_skipped_as_duplicate"))
172173

173-
job_row = driver.send(:to_job_row_from_raw, river_job)
174+
job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types)
174175

175176
expect(job_row).to be_an_instance_of(River::JobRow)
176177
expect(job_row).to have_attributes(
@@ -190,11 +191,12 @@
190191
tags: ["tag1"],
191192
unique_key: Digest::SHA256.digest("unique_key_str")
192193
)
194+
expect(skipped_as_duplicate).to be(true)
193195
end
194196

195197
it "with errors" do
196198
now = Time.now.utc
197-
river_job = River::Driver::ActiveRecord::RiverJob.insert({
199+
res = River::Driver::ActiveRecord::RiverJob.insert({
198200
args: %({"job_num":1}),
199201
errors: [JSON.dump(
200202
{
@@ -207,9 +209,9 @@
207209
kind: "simple",
208210
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
209211
state: River::JOB_STATE_AVAILABLE
210-
}, returning: Arel.sql("*"))
212+
}, returning: Arel.sql("*, false AS unique_skipped_as_duplicate"))
211213

212-
job_row = driver.send(:to_job_row_from_raw, river_job)
214+
job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types)
213215

214216
expect(job_row.errors.count).to be(1)
215217
expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError)
@@ -219,6 +221,7 @@
219221
error: "job failure",
220222
trace: "error trace"
221223
)
224+
expect(skipped_as_duplicate).to be(false)
222225
end
223226
end
224227
end

driver/riverqueue-sequel/Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ GEM
7878

7979
PLATFORMS
8080
arm64-darwin-22
81+
arm64-darwin-23
8182
x86_64-linux
8283

8384
DEPENDENCIES

driver/riverqueue-sequel/lib/driver.rb

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,51 +13,27 @@ def initialize(db)
1313
@db.extension(:pg_json)
1414
end
1515

16-
def advisory_lock(key)
17-
@db.fetch("SELECT pg_advisory_xact_lock(?)", key).first
18-
nil
19-
end
20-
21-
def advisory_lock_try(key)
22-
@db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock]
23-
end
24-
2516
def job_get_by_id(id)
2617
data_set = @db[:river_job].where(id: id)
2718
data_set.first ? to_job_row(data_set.first) : nil
2819
end
2920

30-
def job_get_by_kind_and_unique_properties(get_params)
31-
data_set = @db[:river_job].where(kind: get_params.kind)
32-
data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at
33-
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
34-
data_set = data_set.where(queue: get_params.queue) if get_params.queue
35-
data_set = data_set.where(state: get_params.state) if get_params.state
36-
data_set.first ? to_job_row(data_set.first) : nil
37-
end
38-
3921
def job_insert(insert_params)
40-
to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params)))
22+
job_insert_many([insert_params]).first
4123
end
4224

43-
def job_insert_unique(insert_params, unique_key)
44-
values = @db[:river_job]
25+
def job_insert_many(insert_params_array)
26+
@db[:river_job]
4527
.insert_conflict(
46-
target: [:kind, :unique_key],
47-
conflict_where: ::Sequel.lit("unique_key IS NOT NULL"),
28+
target: [:unique_key],
29+
conflict_where: ::Sequel.lit(
30+
"unique_key IS NOT NULL AND unique_states IS NOT NULL AND river_job_state_in_bitmask(unique_states, state)"
31+
),
4832
update: {kind: ::Sequel[:excluded][:kind]}
4933
)
5034
.returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate"))
51-
.insert_select(
52-
insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key))
53-
)
54-
55-
[to_job_row(values), values[:unique_skipped_as_duplicate]]
56-
end
57-
58-
def job_insert_many(insert_params_many)
59-
@db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) })
60-
insert_params_many.count
35+
.multi_insert(insert_params_array.map { |p| insert_params_to_hash(p) })
36+
.map { |row| to_insert_result(row) }
6137
end
6238

6339
def job_list
@@ -74,8 +50,6 @@ def transaction(&)
7450
end
7551

7652
private def insert_params_to_hash(insert_params)
77-
# the call to `#compact` is important so that we remove nils and table
78-
# default values get picked up instead
7953
{
8054
args: insert_params.encoded_args,
8155
kind: insert_params.kind,
@@ -84,8 +58,14 @@ def transaction(&)
8458
queue: insert_params.queue,
8559
state: insert_params.state,
8660
scheduled_at: insert_params.scheduled_at,
87-
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil
88-
}.compact
61+
tags: ::Sequel.pg_array(insert_params.tags || [], :text),
62+
unique_key: insert_params.unique_key ? ::Sequel.blob(insert_params.unique_key) : nil,
63+
unique_states: insert_params.unique_states
64+
}
65+
end
66+
67+
private def to_insert_result(result)
68+
[to_job_row(result), result[:unique_skipped_as_duplicate]]
8969
end
9070

9171
private def to_job_row(river_job)
@@ -113,7 +93,8 @@ def transaction(&)
11393
scheduled_at: river_job[:scheduled_at].getutc,
11494
state: river_job[:state],
11595
tags: river_job[:tags].to_a,
116-
unique_key: river_job[:unique_key]&.to_s
96+
unique_key: river_job[:unique_key]&.to_s,
97+
unique_states: ::River::UniqueBitmask.to_states(river_job[:unique_states]&.to_i(2))
11798
)
11899
end
119100
end

0 commit comments

Comments
 (0)