Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/adbc/driver/bigquery/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ const (
// OptionStringImpersonateLifetime instructs the driver to impersonate for the
// given duration (e.g. "3600s").
OptionStringImpersonateLifetime = "adbc.bigquery.sql.impersonate.lifetime"

// OptionBoolQueryLinkFailedJob instructs the driver to construct a link to the
// query job if it fails to run.
OptionBoolQueryLinkFailedJob = "adbc.bigquery.sql.query.link_failed_job"
)

var (
Expand Down
27 changes: 19 additions & 8 deletions go/adbc/driver/bigquery/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"log"
"sync/atomic"

Expand Down Expand Up @@ -55,7 +56,7 @@ func checkContext(ctx context.Context, maybeErr error) error {
return ctx.Err()
}

func runQuery(ctx context.Context, query *bigquery.Query, executeUpdate bool) (bigquery.ArrowIterator, int64, error) {
func runQuery(ctx context.Context, query *bigquery.Query, executeUpdate bool, linkFailedJob bool) (bigquery.ArrowIterator, int64, error) {
job, err := query.Run(ctx)
if err != nil {
return nil, -1, err
Expand All @@ -64,8 +65,15 @@ func runQuery(ctx context.Context, query *bigquery.Query, executeUpdate bool) (b
return nil, 0, nil
}

// The project id, location, and job id are all URL-safe:
// Project id and job id can only contain URL safe characters: https://cloud.google.com/bigquery/docs/reference/rest/v2/JobReference
// Locations are also URL-safe, listed here: https://cloud.google.com/bigquery/docs/locations
jobLink := fmt.Sprintf("https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s&page=queryresults", job.ProjectID(), job.Location(), job.ID())
iter, err := job.Read(ctx)
if err != nil {
if linkFailedJob {
return nil, -1, fmt.Errorf("%w (Query: %s)", err, jobLink)
}
return nil, -1, err
}

Expand All @@ -86,6 +94,9 @@ func runQuery(ctx context.Context, query *bigquery.Query, executeUpdate bool) (b
}
}
if arrowIterator, err = iter.ArrowIterator(); err != nil {
if linkFailedJob {
return nil, -1, fmt.Errorf("%w (Query: %s)", err, jobLink)
}
return nil, -1, err
}
} else {
Expand Down Expand Up @@ -117,8 +128,8 @@ func getQueryParameter(values arrow.RecordBatch, row int, parameterMode string)
return parameters, nil
}

func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allocator, resultRecordBufferSize int) (bigqueryRdr *reader, totalRows int64, err error) {
arrowIterator, totalRows, err := runQuery(ctx, query, false)
func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allocator, resultRecordBufferSize int, linkFailedJob bool) (bigqueryRdr *reader, totalRows int64, err error) {
arrowIterator, totalRows, err := runQuery(ctx, query, false, linkFailedJob)
if err != nil {
return nil, -1, err
}
Expand Down Expand Up @@ -162,7 +173,7 @@ func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allo
return bigqueryRdr, totalRows, nil
}

func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch, parameterMode string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema)) (int64, error) {
func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch, parameterMode string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema), linkFailedJob bool) (int64, error) {
totalRows := int64(-1)
for i := 0; i < int(rec.NumRows()); i++ {
parameters, err := getQueryParameter(rec, i, parameterMode)
Expand All @@ -173,7 +184,7 @@ func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, q
query.Parameters = parameters
}

arrowIterator, rows, err := runQuery(ctx, query, false)
arrowIterator, rows, err := runQuery(ctx, query, false, linkFailedJob)
if err != nil {
return -1, err
}
Expand All @@ -198,9 +209,9 @@ func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, q

// kicks off a goroutine for each endpoint and returns a reader which
// gathers all of the records as they come in.
func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters array.RecordReader, parameterMode string, alloc memory.Allocator, resultRecordBufferSize, prefetchConcurrency int) (bigqueryRdr *reader, totalRows int64, err error) {
func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters array.RecordReader, parameterMode string, alloc memory.Allocator, resultRecordBufferSize, prefetchConcurrency int, linkFailedJob bool) (bigqueryRdr *reader, totalRows int64, err error) {
if boundParameters == nil {
return runPlainQuery(ctx, query, alloc, resultRecordBufferSize)
return runPlainQuery(ctx, query, alloc, resultRecordBufferSize, linkFailedJob)
}
defer boundParameters.Release()

Expand Down Expand Up @@ -238,7 +249,7 @@ func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters
// we don't need to call rec.Retain() here and call call rec.Release() in queryRecordWithSchemaCallback
batchRows, err := queryRecordWithSchemaCallback(ctx, group, query, rec, ch, parameterMode, alloc, func(schema *arrow.Schema) {
bigqueryRdr.schema = schema
})
}, linkFailedJob)
if err != nil {
return nil, -1, err
}
Expand Down
18 changes: 15 additions & 3 deletions go/adbc/driver/bigquery/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type statement struct {
streamBinding array.RecordReader
resultRecordBufferSize int
prefetchConcurrency int

// Wrap errors with a link to failed job
linkFailedJob bool
}

func (st *statement) GetOptionBytes(key string) ([]byte, error) {
Expand Down Expand Up @@ -132,6 +135,8 @@ func (st *statement) GetOption(key string) (string, error) {
return strconv.FormatBool(st.queryConfig.DryRun), nil
case OptionBoolQueryCreateSession:
return strconv.FormatBool(st.queryConfig.CreateSession), nil
case OptionBoolQueryLinkFailedJob:
return strconv.FormatBool(st.linkFailedJob), nil
default:
val, err := st.cnxn.GetOption(key)
if err == nil {
Expand Down Expand Up @@ -248,6 +253,13 @@ func (st *statement) SetOption(key string, v string) error {
} else {
return err
}
case OptionBoolQueryLinkFailedJob:
val, err := strconv.ParseBool(v)
if err == nil {
st.linkFailedJob = val
} else {
return err
}

default:
return adbc.Error{
Expand Down Expand Up @@ -309,7 +321,7 @@ func (st *statement) ExecuteQuery(ctx context.Context) (array.RecordReader, int6
return nil, -1, err
}

return newRecordReader(ctx, st.query(), rdr, st.parameterMode, st.cnxn.Alloc, st.resultRecordBufferSize, st.prefetchConcurrency)
return newRecordReader(ctx, st.query(), rdr, st.parameterMode, st.cnxn.Alloc, st.resultRecordBufferSize, st.prefetchConcurrency, st.linkFailedJob)
}

// ExecuteUpdate executes a statement that does not generate a result
Expand All @@ -321,7 +333,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
}

if boundParameters == nil {
_, totalRows, err := runQuery(ctx, st.query(), true)
_, totalRows, err := runQuery(ctx, st.query(), true, st.linkFailedJob)
if err != nil {
return -1, err
}
Expand All @@ -339,7 +351,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
st.queryConfig.Parameters = parameters
}

_, currentRows, err := runQuery(ctx, st.query(), true)
_, currentRows, err := runQuery(ctx, st.query(), true, st.linkFailedJob)
if err != nil {
return -1, err
}
Expand Down
Loading