Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ require (
github.com/charmbracelet/x/term v0.1.1 // indirect
github.com/charmbracelet/x/windows v0.1.0 // indirect
github.com/cloudflare/circl v1.6.1 // indirect
github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8 // indirect
github.com/confluentinc/proto-go-setter v0.3.0 // indirect
github.com/cyphar/filepath-securejoin v0.2.5 // indirect
github.com/distribution/reference v0.6.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ github.com/compose-spec/compose-go/v2 v2.1.3 h1:bD67uqLuL/XgkAK6ir3xZvNLFPxPScEi
github.com/compose-spec/compose-go/v2 v2.1.3/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52 h1:19qEGhkbZa5fopKCe0VPIV+Sasby4Pv10z9ZaktwWso=
github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52/go.mod h1:62EMf+5uFEt1BJ2q8WMrUoI9VUSxAbDnmZCGRt/MbA0=
github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8 h1:0t56uO8mzCIT7PLg/G7yKZc6U0ToR3Pqmi3aN1uPpiM=
github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8/go.mod h1:R2nAnRzw0ug4oUywRLO7AiyciE1dFS1Rf3TIfxj9Znk=
github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0 h1:zSF4OQUJXWH2JeAo9rsq13ibk+JFdzITGR8S7cFMpzw=
github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0/go.mod h1:DoxqzzF3JzvJr3fWkvCiOHFlE0GoYpozWxFZ1Ud9ntA=
github.com/confluentinc/ccloud-sdk-go-v2/apikeys v0.4.0 h1:8fWyLwMuy8ec0MVF5Avd54UvbIxhDFhZzanHBVwgxdw=
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type consumerGroupOut struct {
IsSimple bool `human:"Simple" serialized:"is_simple"`
PartitionAssignor string `human:"Partition Assignor" serialized:"partition_assignor"`
State string `human:"State" serialized:"state"`
ProtocolType string `human:"Type,omitempty" serialized:"type,omitempty"`
}

func (c *consumerCommand) newGroupCommand(cfg *config.Config) *cobra.Command {
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (c *consumerCommand) groupDescribe(cmd *cobra.Command, args []string) error
IsSimple: group.GetIsSimple(),
PartitionAssignor: group.GetPartitionAssignor(),
State: group.GetState(),
ProtocolType: group.GetType(),
})
return table.Print()
}
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (c *consumerCommand) groupList(cmd *cobra.Command, _ []string) error {
IsSimple: group.GetIsSimple(),
PartitionAssignor: group.GetPartitionAssignor(),
State: group.GetState(),
ProtocolType: group.GetType(),
})
}
return list.Print()
Expand Down
52 changes: 52 additions & 0 deletions internal/kafka/command_kafka_stream_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package kafka

import (
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/config"
"github.com/spf13/cobra"
)

type streamGroupOut struct {
Kind string `human:"Kind" serialized:"kind"`
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
State string `human:"State" serialized:"state"`
MemberCount int32 `human:"Member Count" serialized:"member_count"`
SubtopologyCount int32 `human:"Subtopology Count" serialized:"subtopology_count"`
GroupEpoch int32 `human:"Group Epoch" serialized:"group_epoch"`
TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"`
TargetAssignmentEpoch int32 `human:"Target Assignment Epoch" serialized:"target_assignment_epoch"`
Members string `human:"Members" serialized:"members"`
Subtopologies string `human:"Subtopologies" serialized:"subtopologies"`
}

func (c *consumerCommand) newStreamGroupCommand(cfg *config.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "stream-group",
Short: "Manage Kafka stream groups.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogin},
}

cmd.AddCommand(c.newStreamGroupDescribeCommand())
cmd.AddCommand(c.newStreamGroupListCommand())
cmd.AddCommand(c.newStreamGroupMemberCommand())
cmd.AddCommand(c.newStreamGroupMemberAssignmentCommand())
cmd.AddCommand(c.newStreamGroupMemberTaskPartitionsCommand())
cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentCommand())
cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentTaskPartitionsCommand())
cmd.AddCommand(c.newStreamGroupSubtopologyCommand())

return cmd
}

func (c *consumerCommand) validStreamGroupArgs(cmd *cobra.Command, args []string) []string {
if len(args) > 0 {
return nil
}

if err := c.PersistentPreRunE(cmd, args); err != nil {
return nil
}

return pcmd.AutocompleteStreamGroups(cmd, c.AuthenticatedCLICommand)
}
62 changes: 62 additions & 0 deletions internal/kafka/command_kafka_stream_group_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kafka

import (
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
"github.com/spf13/cobra"
)

func (c *consumerCommand) newStreamGroupDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <group>",
Short: "Describe stream group",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
RunE: c.streamGroupDescribe,
}

cmd.Flags().String("group", "", "Group Id.")
pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("group"))

return cmd
}

func (c *consumerCommand) streamGroupDescribe(cmd *cobra.Command, args []string) error {
groupId, err := cmd.Flags().GetString("group")
if err != nil {
return err
}

kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return err
}

streamGroup, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroup(groupId)
if err != nil {
return err
}

table := output.NewTable(cmd)
table.Add(&streamGroupOut{
Kind: streamGroup.GetKind(),
ClusterId: streamGroup.GetClusterId(),
GroupId: streamGroup.GetGroupId(),
State: streamGroup.GetState(),
MemberCount: streamGroup.GetMemberCount(),
SubtopologyCount: streamGroup.GetSubtopologyCount(),
GroupEpoch: streamGroup.GetGroupEpoch(),
TopologyEpoch: streamGroup.GetTopologyEpoch(),
TargetAssignmentEpoch: streamGroup.GetTargetAssignmentEpoch(),
Members: streamGroup.Members.GetRelated(),
Subtopologies: streamGroup.Subtopologies.GetRelated(),
})

return table.Print()
}
65 changes: 65 additions & 0 deletions internal/kafka/command_kafka_stream_group_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package kafka

import (
kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3"
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
"github.com/spf13/cobra"
)

func (c *consumerCommand) newStreamGroupListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List kafka stream groups.",
Args: cobra.NoArgs,
RunE: c.listStreamGroup,
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
}

pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *consumerCommand) listStreamGroup(cmd *cobra.Command, _ []string) error {
groups, err := c.getStreamGroups(cmd)
if err != nil {
return err
}

list := output.NewList(cmd)
for _, streamGroup := range groups {
list.Add(&streamGroupOut{
Kind: streamGroup.GetKind(),
ClusterId: streamGroup.GetClusterId(),
GroupId: streamGroup.GetGroupId(),
State: streamGroup.GetState(),
MemberCount: streamGroup.GetMemberCount(),
SubtopologyCount: streamGroup.GetSubtopologyCount(),
GroupEpoch: streamGroup.GetGroupEpoch(),
TopologyEpoch: streamGroup.GetTopologyEpoch(),
TargetAssignmentEpoch: streamGroup.GetTargetAssignmentEpoch(),
Members: streamGroup.Members.GetRelated(),
Subtopologies: streamGroup.Subtopologies.GetRelated(),
})
}
return list.Print()
}

func (c *consumerCommand) getStreamGroups(cmd *cobra.Command) ([]kafkarestv3Internal.StreamsGroupData, error) {
kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return nil, err
}

topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup()
if err != nil {
return nil, err
}

return topics.Data, nil
}
32 changes: 32 additions & 0 deletions internal/kafka/command_kafka_stream_group_member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kafka

import (
"github.com/spf13/cobra"
)

type streamGroupMemberOut struct {
Kind string `human:"Kind" serialized:"kind"`
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
MemberId string `human:"Member Id" serialized:"member_id"`
ProcessId string `human:"Process Id" serialized:"process_id"`
ClientId string `human:"Client Id" serialized:"client_id"`
InstanceId string `human:"Instance Id" serialized:"instance_id"`
MemberEpoch int32 `human:"Member Epoch" serialized:"member_epoch"`
TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"`
IsClassic bool `human:"Is Classic" serialized:"is_classic"`
Assignments string `human:"Assignments" serialized:"assignments"`
TargetAssign string `human:"Target Assignment" serialized:"target_assignment"`
}

func (c *consumerCommand) newStreamGroupMemberCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "stream-group-member",
Short: "Manage Kafka stream groups members.",
}

cmd.AddCommand(c.newStreamGroupMemberDescribeCommand())
cmd.AddCommand(c.newStreamGroupMemberListCommand())

return cmd
}
27 changes: 27 additions & 0 deletions internal/kafka/command_kafka_stream_group_member_assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package kafka

import (
"github.com/spf13/cobra"
)

type streamGroupMemberAssignmentOut struct {
Kind string `human:"Kind" serialized:"kind"`
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
MemberId string `human:"Member Id" serialized:"member_id"`
ActiveTasks string `human:"Active Tasks" serialized:"active_tasks"`
StandbyTasks string `human:"Standby Tasks" serialized:"standby_tasks"`
WarmupTasks string `human:"Warmup Tasks" serialized:"warmup_tasks"`
}

func (c *consumerCommand) newStreamGroupMemberAssignmentCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "stream-group-member-assignment",
Short: "Manage Kafka stream group member assignments.",
}

cmd.AddCommand(c.newStreamGroupMemberAssignmentDescribeCommand())
cmd.AddCommand(c.newStreamGroupMemberAssignmentListCommand())

return cmd
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kafka

import (
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
"github.com/spf13/cobra"
)

func (c *consumerCommand) newStreamGroupMemberAssignmentDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <member>",
Short: "Describe stream group member assignment",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
RunE: c.streamGroupMemberAssignmentDescribe,
}

cmd.Flags().String("group", "", "Group Id.")
cmd.Flags().String("member", "", "Member Id.")

pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("group"))
cobra.CheckErr(cmd.MarkFlagRequired("member"))

return cmd
}

func (c *consumerCommand) streamGroupMemberAssignmentDescribe(cmd *cobra.Command, args []string) error {
groupId, err := cmd.Flags().GetString("group")
if err != nil {
return err
}

memberId, err := cmd.Flags().GetString("member")
if err != nil {
return err
}

kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return err
}

assignment, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroupMemberAssignment(groupId, memberId)
if err != nil {
return err
}

table := output.NewTable(cmd)
table.Add(&streamGroupMemberAssignmentOut{
Kind: assignment.GetKind(),
ClusterId: assignment.GetClusterId(),
GroupId: assignment.GetGroupId(),
MemberId: assignment.GetMemberId(),
ActiveTasks: assignment.ActiveTasks.GetRelated(),
StandbyTasks: assignment.StandbyTasks.GetRelated(),
WarmupTasks: assignment.WarmupTasks.GetRelated(),
})

return table.Print()
}
Loading