Skip to content

Conversation

@rerpha
Copy link
Collaborator

@rerpha rerpha commented Nov 4, 2025

closes #33

This adds sniff, play, and filtering to consume and listen as described in the README.md. When this is merged I'll create a new release.

@rerpha rerpha marked this pull request as ready for review November 7, 2025 10:06
@rerpha rerpha changed the title Filtering add play, sniff and --filter to consume&listen Nov 7, 2025
@rerpha rerpha requested a review from Tom-Willemsen November 7, 2025 10:08
@rerpha rerpha requested a review from Tom-Willemsen November 10, 2025 15:32
Comment on lines +30 to +31
## `sniff` - List all topics and their high, low watermarks and number of messages
`saluki sniff mybroker:9092`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intuitively typed saluki sniff livedata.isis.cclrc.ac.uk:31092/NDW2922_sampleEnv which sort of worked but actually listed all topics. Given how many topics exist, it would be nice for that to work and just give me the watermarks of that one topic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah just use grep m8

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yeh alright i'll do it


### Between timestamps

`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think this should have worked then?

(.venv) c:\Instrument\dev\saluki>saluki play livedata.isis.cclrc.ac.uk:31092/NDW2922_bluesky livedata.isis.cclrc.ac.uk:31092/TOMTEST_bluesky -t 1762972997 1762979997
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "c:\Instrument\dev\saluki\.venv\Scripts\saluki.exe\__main__.py", line 7, in <module>
  File "C:\Instrument\dev\saluki\src\saluki\main.py", line 124, in main
    play(
  File "C:\Instrument\dev\saluki\src\saluki\play.py", line 45, in play
    TopicPartition(src_topic, src_partition, timestamps[0]),
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'str' object cannot be interpreted as an integer

Copy link
Member

@Tom-Willemsen Tom-Willemsen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere where we support timestamps, do we also want to support passing in a human-format time and parsing it to a timestamp?

I'd kind of like to be able to saluki consume some_topic -t "2025-11-11 09:00" -m 100 to consume 100 messages after 9 o'clock on 11/11... without typing 1762851600

Comment on lines 13 to +28
`saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1.

Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456

# Install
`pip install saluki`
You can also filter out messages to specific schema(s) with the `-f` flag, like the example above for `listen`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should also support timestamps - offsets are not very user friendly...

@rerpha rerpha requested a review from Tom-Willemsen November 26, 2025 15:24
@rerpha rerpha moved this from In progress to In review in Data streaming for HRPD-X (and possibly SANDALS-II) Nov 26, 2025
Copy link
Member

@Tom-Willemsen Tom-Willemsen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - this looks to be working nicely now.

@Tom-Willemsen Tom-Willemsen merged commit f98b8ee into main Nov 26, 2025
9 checks passed
@Tom-Willemsen Tom-Willemsen deleted the filtering branch November 26, 2025 16:00
This was referenced Dec 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

add command to take a topic and two offsets , consume everything between those offsets and produce everything to another topic

3 participants