Skip to content

Commit bbbbfc0

Browse files
authored
Check tag format + return metadata + additional doc strings (#22)
Mainly, bring in tag format checks which verify they're shorter than 255 characters and don't contain any special characters (especially commas), like Go and Python do already. Start returning metadata in jobs, although notably, it's not possible to insert with it yet. A few additional docstrings brought over from my project to document River Python.
1 parent 7e36d7b commit bbbbfc0

File tree

9 files changed

+57
-8
lines changed

9 files changed

+57
-8
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed
11+
12+
- Tag format is now checked on insert. Tags should be no more than 255 characters and match the regex `/\A[\w][\w\-]+[\w]\z/`. [PR #22](https://github.com/riverqueue/riverqueue-ruby/pull/22).
13+
- Returned jobs now have a `metadata` property. [PR #21](https://github.com/riverqueue/riverqueue-ruby/pull/22).
14+
1015
## [0.4.0] - 2024-04-28
1116

1217
### Changed

docs/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ insert_res.unique_skipped_as_duplicated
8686

8787
### Custom advisory lock prefix
8888

89-
Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
89+
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
9090

9191
```ruby
9292
client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)

driver/riverqueue-activerecord/lib/driver.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def transaction(&)
100100
finalized_at: river_job.finalized_at,
101101
kind: river_job.kind,
102102
max_attempts: river_job.max_attempts,
103+
metadata: river_job.metadata,
103104
priority: river_job.priority,
104105
queue: river_job.queue,
105106
scheduled_at: river_job.scheduled_at,

driver/riverqueue-sequel/lib/driver.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def transaction(&)
8686
finalized_at: river_job.finalized_at,
8787
kind: river_job.kind,
8888
max_attempts: river_job.max_attempts,
89+
metadata: river_job.metadata,
8990
priority: river_job.priority,
9091
queue: river_job.queue,
9192
scheduled_at: river_job.scheduled_at,

lib/client.rb

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22
require "time"
33

44
module River
5+
# Default number of maximum attempts for a job.
56
MAX_ATTEMPTS_DEFAULT = 25
7+
8+
# Default priority for a job.
69
PRIORITY_DEFAULT = 1
10+
11+
# Default queue for a job.
712
QUEUE_DEFAULT = "default"
813

914
# Provides a client for River that inserts jobs. Unlike the Go version of the
@@ -241,7 +246,7 @@ def insert_many(args)
241246
queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT,
242247
scheduled_at: scheduled_at&.utc, # database defaults to now
243248
state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE,
244-
tags: insert_opts.tags || args_insert_opts.tags
249+
tags: validate_tags(insert_opts.tags || args_insert_opts.tags)
245250
),
246251
unique_opts
247252
]
@@ -260,6 +265,16 @@ def insert_many(args)
260265
private def uint64_to_int64(int)
261266
[int].pack("Q").unpack1("q") #: Integer # rubocop:disable Layout/LeadingCommentSpace
262267
end
268+
269+
TAG_RE = /\A[\w][\w\-]+[\w]\z/
270+
private_constant :TAG_RE
271+
272+
private def validate_tags(tags)
273+
tags&.each do |tag|
274+
raise ArgumentError, "tags should be 255 characters or less" if tag.length > 255
275+
raise ArgumentError, "tag should match regex #{TAG_RE.inspect}" unless TAG_RE.match(tag)
276+
end
277+
end
263278
end
264279

265280
# A single job to insert that's part of an #insert_many batch insert. Unlike

lib/job.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ class JobRow
5555
# The set of worker IDs that have worked this job. A worker ID differs
5656
# between different programs, but is shared by all executors within any
5757
# given one. (i.e. Different Go processes have different IDs, but IDs are
58-
# shared within any given process.) A process generates a new ULID (an
59-
# ordered UUID) worker ID when it starts up.
58+
# shared within any given process.) A process generates a new ID based on
59+
# host and current time when it starts up.
6060
attr_accessor :attempted_by
6161

6262
# When the job record was created.
6363
attr_accessor :created_at
6464

6565
# A set of errors that occurred when the job was worked, one for each
66-
# attempt. Ordered from earliest error to the latest error.
66+
# attempt. Ordered from earliest error to the latest error.
6767
attr_accessor :errors
6868

6969
# The time at which the job was "finalized", meaning it was either completed
@@ -79,6 +79,9 @@ class JobRow
7979
# for the last time and will no longer be worked.
8080
attr_accessor :max_attempts
8181

82+
# Arbitrary metadata associated with the job.
83+
attr_accessor :metadata
84+
8285
# The priority of the job, with 1 being the highest priority and 4 being the
8386
# lowest. When fetching available jobs to work, the highest priority jobs
8487
# will always be fetched before any lower priority jobs are fetched. Note
@@ -112,6 +115,7 @@ def initialize(
112115
created_at:,
113116
kind:,
114117
max_attempts:,
118+
metadata:,
115119
priority:,
116120
queue:,
117121
scheduled_at:,
@@ -134,6 +138,7 @@ def initialize(
134138
self.finalized_at = finalized_at
135139
self.kind = kind
136140
self.max_attempts = max_attempts
141+
self.metadata = metadata
137142
self.priority = priority
138143
self.queue = queue
139144
self.scheduled_at = scheduled_at
@@ -157,7 +162,7 @@ class AttemptError
157162
attr_accessor :error
158163

159164
# Contains a stack trace from a job that panicked. The trace is produced by
160-
# invoking `debug.Trace()`.
165+
# invoking `debug.Trace()` in Go.
161166
attr_accessor :trace
162167

163168
def initialize(

sig/client.rbs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ module River
1616
def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer
1717

1818
private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult
19-
private def uint64_to_int64: (Integer) -> Integer
2019
private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?]
2120
private def truncate_time: (Time, Integer) -> Time
21+
private def uint64_to_int64: (Integer) -> Integer
22+
23+
TAG_RE: Regexp
24+
25+
private def validate_tags: (Array[String]?) -> Array[String]?
2226
end
2327

2428
class InsertManyParams

sig/job.rbs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,14 @@ module River
4545
attr_accessor finalized_at: Time?
4646
attr_accessor kind: String
4747
attr_accessor max_attempts: Integer
48+
attr_accessor metadata: Hash[String, untyped]
4849
attr_accessor priority: Integer
4950
attr_accessor queue: String
5051
attr_accessor scheduled_at: Time
5152
attr_accessor state: jobStateAll
5253
attr_accessor tags: Array[String]?
5354

54-
def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void
55+
def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void
5556
end
5657

5758
class AttemptError

spec/client_spec.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def transaction(&)
5151
finalized_at: nil,
5252
kind: insert_params.kind,
5353
max_attempts: insert_params.max_attempts,
54+
metadata: nil,
5455
priority: insert_params.priority,
5556
queue: insert_params.queue,
5657
scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB
@@ -194,6 +195,22 @@ def to_json = nil
194195
end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`")
195196
end
196197

198+
it "raises error if tags are too long" do
199+
expect do
200+
client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(
201+
tags: ["a" * 256]
202+
))
203+
end.to raise_error(ArgumentError, "tags should be 255 characters or less")
204+
end
205+
206+
it "raises error if tags are misformatted" do
207+
expect do
208+
client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(
209+
tags: ["no,commas,allowed"]
210+
))
211+
end.to raise_error(ArgumentError, 'tag should match regex /\A[\w][\w\-]+[\w]\z/')
212+
end
213+
197214
def check_bigint_bounds(int)
198215
raise "lock key shouldn't be larger than Postgres bigint max (9223372036854775807); was: #{int}" if int > 9223372036854775807
199216
raise "lock key shouldn't be smaller than Postgres bigint min (-9223372036854775808); was: #{int}" if int < -9223372036854775808

0 commit comments

Comments
 (0)