Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,64 @@
![](https://github.com/ISISComputingGroup/saluki/blob/main/resources/logo.png)

Serialise/deserialise flatbuffers blobs from kafka.
This currently deserialises https://github.com/ess-dmsc/python-streaming-data-types, but I am working to make it agnostic. Python bindings for the respective schema will need to be generated.
ISIS-specific Kafka tools.
Deserialises [the ESS flatbuffers blobs](https://github.com/ess-dmsc/python-streaming-data-types) from Kafka.

Also allows replaying data in a topic.

# Usage

To run the latest version, install [uv](https://docs.astral.sh/uv/getting-started/installation/) and use `uvx saluki <args>`.

alternatively you can `pip install saluki` and run it from a `venv`.

See `saluki --help` for all options.

## Listen to a topic for updates
## `listen` - Listen to a topic for updates
`saluki listen mybroker:9092/mytopic` - This will listen for updates for `mytopic` on `mybroker`.

## Consume from a topic
### Filter to specific schemas

`saluki listen mybroker:9092/mytopic -f f144 -f f142` - This will listen for updates but ignore messages with schema IDs of `f142` or `f144`

## `consume`- Consume from a topic
`saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1.

Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456

# Install
`pip install saluki`
You can also filter out messages to specific schema(s) with the `-f` flag, like the example above for `listen`.
Comment on lines 13 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should also support timestamps - offsets are not very user friendly...


## `sniff` - List all topics and their high, low watermarks and number of messages
`saluki sniff mybroker:9092`
Comment on lines +30 to +31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intuitively typed saluki sniff livedata.isis.cclrc.ac.uk:31092/NDW2922_sampleEnv which sort of worked but actually listed all topics. Given how many topics exist, it would be nice for that to work and just give me the watermarks of that one topic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah just use grep m8

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yeh alright i'll do it


Output looks as follows:

```
$ saluki sniff mybroker:9092
INFO:saluki:Cluster ID: redpanda.0faa4595-7298-407e-9db7-7e2758d1af1f
INFO:saluki:Brokers:
INFO:saluki: 192.168.0.111:9092/1
INFO:saluki: 192.168.0.112:9092/2
INFO:saluki: 192.168.0.113:9092/0
INFO:saluki:Topics:
INFO:saluki: MERLIN_events:
INFO:saluki: 0 - low:262322729, high:302663378, num_messages:40340649
INFO:saluki: MERLIN_runInfo:
INFO:saluki: 0 - low:335, high:2516, num_messages:2181
INFO:saluki: MERLIN_monitorHistograms:
INFO:saluki: 0 - low:7515, high:7551, num_messages:36
```

## `play` - Replay data from one topic to another

### Between offsets

`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -o 123 125` - This will forward messages at offset 123, 124 and 125 in the `source_topic` to the `dest_topic`

### Between timestamps

`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think this should have worked then?

(.venv) c:\Instrument\dev\saluki>saluki play livedata.isis.cclrc.ac.uk:31092/NDW2922_bluesky livedata.isis.cclrc.ac.uk:31092/TOMTEST_bluesky -t 1762972997 1762979997
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "c:\Instrument\dev\saluki\.venv\Scripts\saluki.exe\__main__.py", line 7, in <module>
  File "C:\Instrument\dev\saluki\src\saluki\main.py", line 124, in main
    play(
  File "C:\Instrument\dev\saluki\src\saluki\play.py", line 45, in play
    TopicPartition(src_topic, src_partition, timestamps[0]),
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'str' object cannot be interpreted as an integer


## Developer setup
`pip install .[dev]`
# Developer setup
`pip install -e .[dev]`

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ name = "saluki"
dynamic = ["version"]
dependencies = [
"ess-streaming-data-types",
"confluent-kafka",
"confluent-kafka>=2.12.1", # for produce_batch in play()
"python-dateutil",
"tzdata"
]
readme = {file = "README.md", content-type = "text/markdown"}
Expand Down
13 changes: 11 additions & 2 deletions src/saluki/consume.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import uuid

from confluent_kafka import Consumer, TopicPartition

Expand All @@ -14,6 +15,8 @@ def consume(
num_messages: int = 1,
offset: int | None = None,
go_forwards: bool = False,
schemas_to_filter_to: list[str] | None = None,
timestamp: int | None = None,
) -> None:
"""
consume from a topic and deserialise each message
Expand All @@ -24,12 +27,14 @@ def consume(
:param num_messages: number of messages to consume
:param offset: offset to consume from/to
:param go_forwards: whether to consume forwards or backwards
:param schemas_to_filter_to: schemas in messages to filter to
:param timestamp: optionally a timestamp as a starting point
:return: None
"""
c = Consumer(
{
"bootstrap.servers": broker,
"group.id": "saluki",
"group.id": f"saluki-consume-{uuid.uuid4()}",
"session.timeout.ms": 6000,
"auto.offset.reset": "latest",
"enable.auto.offset.store": False,
Expand All @@ -38,6 +43,10 @@ def consume(
}
)

if timestamp is not None:
offset = c.offsets_for_times([TopicPartition(topic, partition, timestamp)])[0].offset
logger.debug(f"offset for timestamp {timestamp} is {offset}")

if go_forwards:
if offset is None:
raise ValueError("Can't go forwards without an offset")
Expand All @@ -57,7 +66,7 @@ def consume(
try:
logger.info(f"Consuming {num_messages} messages")
msgs = c.consume(num_messages)
deserialise_and_print_messages(msgs, partition)
deserialise_and_print_messages(msgs, partition, schemas_to_filter_to)
except Exception:
logger.exception("Got exception while consuming:")
finally:
Expand Down
15 changes: 12 additions & 3 deletions src/saluki/listen.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import uuid

from confluent_kafka import Consumer, TopicPartition

Expand All @@ -7,18 +8,24 @@
logger = logging.getLogger("saluki")


def listen(broker: str, topic: str, partition: int | None = None) -> None:
def listen(
broker: str,
topic: str,
partition: int | None = None,
schemas_to_filter_to: list[str] | None = None,
) -> None:
"""
Listen to a topic and deserialise each message
:param broker: the broker address, including the port
:param topic: the topic to use
:param partition: the partition to listen to (default is all partitions in a given topic)
:param schemas_to_filter_to: schemas to filter when listening to messages
:return: None
"""
c = Consumer(
{
"bootstrap.servers": broker,
"group.id": "saluki",
"group.id": f"saluki-listen-{uuid.uuid4()}",
"auto.offset.reset": "latest",
"enable.auto.commit": False,
}
Expand All @@ -30,7 +37,9 @@ def listen(broker: str, topic: str, partition: int | None = None) -> None:
logger.info(f"listening to {broker}/{topic}")
while True:
msg = c.poll(1.0)
deserialise_and_print_messages([msg], partition)
deserialise_and_print_messages(
[msg], partition, schemas_to_filter_to=schemas_to_filter_to
)
except KeyboardInterrupt:
logger.debug("finished listening")
finally:
Expand Down
123 changes: 97 additions & 26 deletions src/saluki/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,57 @@

from saluki.consume import consume
from saluki.listen import listen
from saluki.utils import parse_kafka_uri
from saluki.play import play
from saluki.sniff import sniff
from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri

logger = logging.getLogger("saluki")
logging.basicConfig(level=logging.INFO)

_LISTEN = "listen"
_CONSUME = "consume"
_PLAY = "play"
_SNIFF = "sniff"


def main() -> None:
parser = argparse.ArgumentParser(
prog="saluki",
description="serialise/de-serialise flatbuffers and consume/produce from/to kafka",
)
common_options = argparse.ArgumentParser(add_help=False)
common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action="store_true")
common_options.add_argument(
"-l",
"--log-file",
help="filename to output all data to",
required=False,
default=None,
type=argparse.FileType("a"),
)

parent_parser = argparse.ArgumentParser(add_help=False)
parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
topic_parser = argparse.ArgumentParser(add_help=False)
topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")

parent_parser.add_argument(
topic_parser.add_argument(
"-X",
"--kafka-config",
help="kafka options to pass through to librdkafka",
required=False,
default=None,
)
parent_parser.add_argument(
"-l",
"--log-file",
help="filename to output all data to",
required=False,
default=None,
type=argparse.FileType("a"),
)
topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
topic_parser.add_argument("-f", "--filter", required=False, action="append")

sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")

sniff_parser = sub_parsers.add_parser(
_SNIFF, help="sniff - broker metadata", parents=[common_options]
)
sniff_parser.add_argument(
"broker", type=str, help="broker, optionally suffixed with a topic name to filter to"
)

consumer_parser = argparse.ArgumentParser(add_help=False)
consumer_parser.add_argument(
"-e",
Expand All @@ -50,7 +65,7 @@ def main() -> None:
)

consumer_mode_parser = sub_parsers.add_parser(
_CONSUME, help="consumer mode", parents=[parent_parser, consumer_parser]
_CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options]
)
consumer_mode_parser.add_argument(
"-m",
Expand All @@ -60,44 +75,100 @@ def main() -> None:
required=False,
default=1,
)
consumer_mode_parser.add_argument(
"-o", "--offset", help="offset to consume from", type=int, required=False
)
consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str)

consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
cg = consumer_mode_parser.add_mutually_exclusive_group(required=False)
cg.add_argument(
"-o",
"--offset",
help="offset to consume from",
type=int,
)
cg.add_argument(
"-t",
"--timestamp",
help="timestamp to consume from",
type=dateutil_parsable_or_unix_timestamp,
)

listen_parser = sub_parsers.add_parser(
listen_parser = sub_parsers.add_parser( # noqa: F841
_LISTEN,
help="listen mode - listen until KeyboardInterrupt",
parents=[parent_parser, consumer_parser],
parents=[topic_parser, consumer_parser, common_options],
)

play_parser = sub_parsers.add_parser(
_PLAY,
help="replay mode - replay data into another topic",
parents=[common_options],
)
play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic")
g = play_parser.add_mutually_exclusive_group(required=True)
g.add_argument(
"-o",
"--offsets",
help="offsets to replay between (inclusive)",
type=int,
nargs=2,
)
g.add_argument(
"-t",
"--timestamps",
help="timestamps to replay between in ISO8601 or RFC3339 format ie."
' "2025-11-17 07:00:00 or as a unix timestamp" ',
type=dateutil_parsable_or_unix_timestamp,
nargs=2,
)
listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None)

if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()

if args.kafka_config is not None:
raise NotImplementedError("-X is not implemented yet.")

broker, topic = parse_kafka_uri(args.topic)
if args.verbose:
logger.setLevel(logging.DEBUG)

if args.log_file:
logger.addHandler(logging.FileHandler(args.log_file.name))

if "kafka_config" in args and args.kafka_config is not None:
raise NotImplementedError("-X is not implemented yet.")

if args.command == _LISTEN:
listen(broker, topic, args.partition)
broker, topic = parse_kafka_uri(args.topic)
listen(broker, topic, args.partition, args.filter)
elif args.command == _CONSUME:
broker, topic = parse_kafka_uri(args.topic)
consume(
broker,
topic,
args.partition,
args.messages,
args.offset,
args.go_forwards,
args.filter,
args.timestamp,
)
elif args.command == _PLAY:
src_broker, src_topic = parse_kafka_uri(args.topics[0])
dest_broker, dest_topic = parse_kafka_uri(args.topics[1])

play(
src_broker,
src_topic,
dest_broker,
dest_topic,
args.offsets,
args.timestamps,
)
elif args.command == _SNIFF:
try:
broker, topic = parse_kafka_uri(args.broker)
logger.debug(f"Sniffing single topic {topic} on broker {broker}")
sniff(broker, topic)
except RuntimeError:
logger.debug(f"Sniffing whole broker {args.broker}")
sniff(args.broker)


if __name__ == "__main__":
Expand Down
Loading
Loading