Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/flink/command_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (c *command) newStatementCommand(cfg *config.Config) *cobra.Command {
cmd.AddCommand(c.newStatementWebUiForwardCommand())
}
cmd.AddCommand(c.newStatementExceptionCommand(cfg))
cmd.AddCommand(c.newStatementResultCommand(cfg))

return cmd
}
Expand Down
11 changes: 11 additions & 0 deletions internal/flink/command_statement_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}

// If the statement produces results, fetch and display them
traits := statement.Status.GetTraits()
schema := traits.GetSchema()
if columns := schema.GetColumns(); len(columns) > 0 {
statementResults, err := fetchAllResults(client, environmentId, name, c.Context.LastOrgId, schema, defaultMaxResultRows)
if err != nil {
return err
}
return printStatementResults(cmd, statementResults)
}
}

table := output.NewTable(cmd)
Expand Down
167 changes: 167 additions & 0 deletions internal/flink/command_statement_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package flink

import (
"fmt"
"net/url"

"github.com/olekukonko/tablewriter"
"github.com/spf13/cobra"

flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1"

"github.com/confluentinc/cli/v4/pkg/ccloudv2"
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/config"
"github.com/confluentinc/cli/v4/pkg/output"
)

const defaultMaxResultRows = 100

func (c *command) newStatementResultCommand(cfg *config.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "result",
Short: "Manage Flink SQL statement results.",
}

if cfg.IsCloudLogin() {
pcmd.AddCloudFlag(cmd)
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)
cmd.AddCommand(c.newStatementResultListCommand())
}

return cmd
}

type serializedResultOutput struct {
Columns []string `json:"columns" yaml:"columns"`
Rows []map[string]any `json:"rows" yaml:"rows"`
}

type statementResultData struct {
Headers []string
Rows [][]string
}

func printStatementResults(cmd *cobra.Command, data *statementResultData) error {
if data == nil || len(data.Rows) == 0 {
if output.GetFormat(cmd).IsSerialized() {
headers := []string{}
if data != nil {
headers = data.Headers
}
return output.SerializedOutput(cmd, &serializedResultOutput{
Columns: headers,
Rows: []map[string]any{},
})
}
fmt.Fprintln(cmd.OutOrStdout(), "No results found.")
return nil
}

if output.GetFormat(cmd).IsSerialized() {
rows := make([]map[string]any, len(data.Rows))
for i, row := range data.Rows {
rowMap := make(map[string]any)
for j, val := range row {
if j < len(data.Headers) {
rowMap[data.Headers[j]] = val
}
}
rows[i] = rowMap
}
return output.SerializedOutput(cmd, &serializedResultOutput{
Columns: data.Headers,
Rows: rows,
})
}

table := tablewriter.NewWriter(cmd.OutOrStdout())
table.SetAutoFormatHeaders(false)
table.SetHeader(data.Headers)
table.SetAutoWrapText(false)
table.SetBorder(false)

for _, row := range data.Rows {
table.Append(row)
}

table.Render()
return nil
}

func fetchAllResults(client ccloudv2.GatewayClientInterface, envId, name, orgId string, schema flinkgatewayv1.SqlV1ResultSchema, maxRows int) (*statementResultData, error) {
columns := schema.GetColumns()
headers := make([]string, len(columns))
for i, col := range columns {
headers[i] = col.GetName()
}

var allRows [][]string
pageToken := ""

for {
resp, err := client.GetStatementResults(envId, name, orgId, pageToken)
if err != nil {
return nil, err
}

resultSet := resp.GetResults()
rawData := resultSet.GetData()
for _, item := range rawData {
resultItem, ok := item.(map[string]any)
if !ok {
continue
}
rowFields, _ := resultItem["row"].([]any)
row := make([]string, len(headers))
for j, field := range rowFields {
if j < len(headers) {
row[j] = fieldToString(field)
}
}
allRows = append(allRows, row)
}

if maxRows > 0 && len(allRows) >= maxRows {
allRows = allRows[:maxRows]
break
}

nextUrl := resp.Metadata.GetNext()
nextToken, err := extractResultPageToken(nextUrl)
if err != nil {
return nil, err
}
if nextToken == "" {
break
}
pageToken = nextToken
}

return &statementResultData{
Headers: headers,
Rows: allRows,
}, nil
}

func fieldToString(field any) string {
if field == nil {
return "NULL"
}
return fmt.Sprintf("%v", field)
}

func extractResultPageToken(nextUrl string) (string, error) {
if nextUrl == "" {
return "", nil
}
parsed, err := url.Parse(nextUrl)
if err != nil {
return "", err
}
params, err := url.ParseQuery(parsed.RawQuery)
if err != nil {
return "", err
}
return params.Get("page_token"), nil
}
110 changes: 110 additions & 0 deletions internal/flink/command_statement_result_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package flink

import (
"fmt"
"time"

"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/retry"
)

func (c *command) newStatementResultListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list <statement-name>",
Short: "List results for a Flink SQL statement.",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStatementArgs),
RunE: c.statementResultList,
}

pcmd.AddCloudFlag(cmd)
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddOutputFlag(cmd)
cmd.Flags().Bool("wait", false, "Block until the statement is no longer pending before fetching results.")
cmd.Flags().Int("max-rows", defaultMaxResultRows, "Maximum number of result rows to fetch.")

return cmd
}

func (c *command) statementResultList(cmd *cobra.Command, args []string) error {
environmentId, err := c.Context.EnvironmentId()
if err != nil {
return err
}

client, err := c.GetFlinkGatewayClient(false)
if err != nil {
return err
}

statement, err := client.GetStatement(environmentId, args[0], c.Context.GetCurrentOrganization())
if err != nil {
return err
}

phase := statement.Status.GetPhase()

if phase == "FAILED" {
return fmt.Errorf("statement %q has failed: %s", args[0], statement.Status.GetDetail())
}

if phase == "PENDING" {
wait, err := cmd.Flags().GetBool("wait")
if err != nil {
return err
}
if !wait {
return fmt.Errorf("statement %q is still pending, use --wait to wait for it to complete", args[0])
}

err = retry.Retry(time.Second, 2*time.Minute, func() error {
statement, err = client.GetStatement(environmentId, args[0], c.Context.GetCurrentOrganization())
if err != nil {
return err
}
if statement.Status.GetPhase() == "PENDING" {
return fmt.Errorf(`statement phase is "%s"`, statement.Status.GetPhase())
}
return nil
})
if err != nil {
return err
}

if statement.Status.GetPhase() == "FAILED" {
return fmt.Errorf("statement %q has failed: %s", args[0], statement.Status.GetDetail())
}
}

traits := statement.Status.GetTraits()
schema := traits.GetSchema()
columns := schema.GetColumns()
if len(columns) == 0 {
fmt.Fprintln(cmd.OutOrStdout(), "Statement has no results to display.")
return nil
}

maxRows, err := cmd.Flags().GetInt("max-rows")
if err != nil {
return err
}

statementResults, err := fetchAllResults(client, environmentId, args[0], c.Context.GetCurrentOrganization(), schema, maxRows)
if err != nil {
return err
}

if err := printStatementResults(cmd, statementResults); err != nil {
return err
}

if maxRows > 0 && len(statementResults.Rows) >= maxRows {
fmt.Fprintf(cmd.ErrOrStderr(), "Warning: results truncated at %d rows. Use --max-rows to adjust the limit.\n", maxRows)
}

return nil
}
12 changes: 4 additions & 8 deletions test/fixtures/output/flink/statement/create-wait.golden
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
+---------------+-------------------------------+
| Creation Date | 2022-01-01 00:00:00 +0000 UTC |
| Name | my-statement |
| Statement | CREATE TABLE test; |
| Compute Pool | lfcp-123456 |
| Status | COMPLETED |
| Status Detail | SQL statement is completed |
+---------------+-------------------------------+
database_name
-----------------
my-cluster
other-cluster
1 change: 1 addition & 0 deletions test/fixtures/output/flink/statement/help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Available Commands:
describe Describe a Flink SQL statement.
exception Manage Flink SQL statement exceptions.
list List Flink SQL statements.
result Manage Flink SQL statement results.
resume Resume a Flink SQL statement.
stop Stop a Flink SQL statement.
update Update a Flink SQL statement.
Expand Down
18 changes: 18 additions & 0 deletions test/fixtures/output/flink/statement/result/help.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Manage Flink SQL statement results.

Usage:
confluent flink statement result [command]

Available Commands:
list List results for a Flink SQL statement.

Flags:
--cloud string Specify the cloud provider as "aws", "azure", or "gcp".
--region string Cloud region for Flink (use "confluent flink region list" to see all).

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Use "confluent flink statement result [command] --help" for more information about a command.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026`
Error: statement "failed-statement" has failed: Syntax error in SQL statement
18 changes: 18 additions & 0 deletions test/fixtures/output/flink/statement/result/list-help.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
List results for a Flink SQL statement.

Usage:
confluent flink statement result list <statement-name> [flags]

Flags:
--cloud string Specify the cloud provider as "aws", "azure", or "gcp".
--region string Cloud region for Flink (use "confluent flink region list" to see all).
--environment string Environment ID.
--context string CLI context name.
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")
--wait Block until the statement is no longer pending before fetching results.
--max-rows int Maximum number of result rows to fetch. (default 100)

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
12 changes: 12 additions & 0 deletions test/fixtures/output/flink/statement/result/list-json.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026`
{
"columns": ["database_name"],
"rows": [
{
"database_name": "my-cluster"
},
{
"database_name": "other-cluster"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026`
Statement has no results to display.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026`
Error: statement "pending-statement" is still pending, use --wait to wait for it to complete
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026`
columns:
- database_name
rows:
- database_name: my-cluster
- database_name: other-cluster
5 changes: 5 additions & 0 deletions test/fixtures/output/flink/statement/result/list.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026`
database_name
-----------------
my-cluster
other-cluster
Loading