Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 295 additions & 0 deletions THIRD_PARTY_NOTICES.txt

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cbt.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ All values are optional and can be overridden at the command prompt.
// ` + "`" + `columns` + "`" + `
// : A mapping from column names to column objects.

// ` + "`" + `avro_schema_paths` + "`" + `
// : A map of type - avsc schema files defining
// : available schemas per record.

// ` + "`" + `families` + "`" + `
// : A mapping from family names to family objects.

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.2 // indirect
github.com/linkedin/goavro/v2 v2.12.0
golang.org/x/net v0.41.0 // indirect
golang.org/x/text v0.26.0 // indirect
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 // indirect
Expand All @@ -47,6 +48,7 @@ require (
github.com/go-jose/go-jose/v4 v4.0.5 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cncf/xds/go v0.0.0-20250326154945-ae57f3c0d45f h1:C5bqEmzEPLsHm9Mv73lSE9e9bKV23aB1vxOsmZrkl3k=
github.com/cncf/xds/go v0.0.0-20250326154945-ae57f3c0d45f/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
Expand All @@ -43,6 +44,9 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
Expand All @@ -61,6 +65,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg=
github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
Expand All @@ -76,6 +82,10 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
Expand Down Expand Up @@ -127,6 +137,7 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
Expand Down
10 changes: 10 additions & 0 deletions testdata/TestParseValueFormatSettings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ protocol_buffer_paths:
- mycode/stuff
- /home/user/dev/othercode/

avro_schema_paths:
namespace.Record:
- file1.avsc
- file2.avsc
namespace.Banana:
- banana.avsc

families:
family1:
default_encoding: BigEndian
Expand Down Expand Up @@ -59,3 +66,6 @@ columns:
col4:
encoding: P
type: hobby
col5:
encoding: A
type: namespace.Record
9 changes: 9 additions & 0 deletions testdata/cat.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{

"type": "record",
"name": "Cat",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
8 changes: 8 additions & 0 deletions testdata/cat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ protocol_buffer_definitions:
protocol_buffer_paths:
- testdata/

avro_schema_paths:
io.animal.Cat:
- testdata/cat_with_color.avsc
- testdata/cat.avsc

columns:
cat:
encoding: ProtocolBuffer
type: Cat

avro:
encoding: a
type: io.animal.Cat

json:
encoding: json
10 changes: 10 additions & 0 deletions testdata/cat_with_color.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{

"type": "record",
"name": "Cat",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "color", "type": "int"}
]
}
84 changes: 78 additions & 6 deletions valueformatting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
"strings"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/jhump/protoreflect/dynamic"
"github.com/linkedin/goavro/v2"
"gopkg.in/yaml.v2"
)

Expand All @@ -49,10 +52,11 @@ func newValueFormatFamily() valueFormatFamily { // for tests :)
}

type valueFormatSettings struct {
ProtocolBufferDefinitions []string `yaml:"protocol_buffer_definitions"`
ProtocolBufferPaths []string `yaml:"protocol_buffer_paths"`
DefaultEncoding string `yaml:"default_encoding"`
DefaultType string `yaml:"default_type"`
ProtocolBufferDefinitions []string `yaml:"protocol_buffer_definitions"`
ProtocolBufferPaths []string `yaml:"protocol_buffer_paths"`
AvroSchemaPaths map[string][]string `yaml:"avro_schema_paths"`
DefaultEncoding string `yaml:"default_encoding"`
DefaultType string `yaml:"default_type"`
Columns map[string]valueFormatColumn
Families map[string]valueFormatFamily
}
Expand All @@ -62,6 +66,7 @@ type valueFormatter func([]byte) (string, error)
type valueFormatting struct {
settings valueFormatSettings
pbMessageTypes map[string]*desc.MessageDescriptor
avroSchemas map[string][]*goavro.Codec
formatters map[[2]string]valueFormatter
}

Expand Down Expand Up @@ -242,6 +247,30 @@ func (f *valueFormatting) pbFormatter(ctype string) (valueFormatter, error) {
}, nil
}

func (f *valueFormatting) avroFormatter(ctype string) (valueFormatter, error) {
schemas := f.avroSchemas[ctype]

if schemas == nil {
return nil, fmt.Errorf("no Avro Schema for: %v", ctype)
}

return func(in []byte) (string, error) {
for i, schema := range schemas {
native, _, err := schema.NativeFromBinary(in)
if err != nil {
log.Printf("unable to read data with %d for %v: %v\n", i, ctype, err)
continue
}
b, err := schema.TextualFromNative(nil, native)
if err != nil {
return "", err
}
return string(b), nil
}
return "", fmt.Errorf("unable to read the data for: %v", ctype)
}, nil
}

type validEncodings int

const (
Expand All @@ -251,6 +280,7 @@ const (
protocolBuffer // for pretty-print
hex // formatting
jsonEncoded
avro
)

var validValueFormattingEncodings = map[string]validEncodings{
Expand All @@ -268,6 +298,8 @@ var validValueFormattingEncodings = map[string]validEncodings{
"protocol_buffer": protocolBuffer,
"proto": protocolBuffer,
"p": protocolBuffer,
"avro": avro,
"a": avro,
"": none,
}

Expand Down Expand Up @@ -365,7 +397,7 @@ func (f *valueFormatting) validateColumns() error {
}

func (f *valueFormatting) parse(path string) error {
data, err := ioutil.ReadFile(path)
data, err := os.ReadFile(path)
if err == nil {
err = yaml.UnmarshalStrict([]byte(data), &f.settings)
}
Expand Down Expand Up @@ -397,6 +429,36 @@ func (f *valueFormatting) setupPBMessages() error {
return nil
}

func (f *valueFormatting) setupAvroSchemas() error {
f.avroSchemas = make(map[string][]*goavro.Codec)
if len(f.settings.AvroSchemaPaths) > 0 {
for name, paths := range f.settings.AvroSchemaPaths {
codecs, err := getAvroCodecs(paths)
if err != nil {
return err
}
f.avroSchemas[name] = codecs
}
}
return nil
}

func getAvroCodecs(paths []string) ([]*goavro.Codec, error) {
codecs := make([]*goavro.Codec, len(paths))
for i, path := range paths {
s, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return nil, fmt.Errorf("cannot read schema: %q %#v", path, err)
}
codec, err := goavro.NewCodec(string(s))
if err != nil {
return nil, fmt.Errorf("cannot create avro schema: %q %#v", path, err)
}
codecs[i] = codec
}
return codecs, nil
}

func (f *valueFormatting) setup(formatFilePath string) error {
var err error = nil

Expand All @@ -415,6 +477,11 @@ func (f *valueFormatting) setup(formatFilePath string) error {
return err
}

err = f.setupAvroSchemas()
if err != nil {
return err
}

err = f.validateColumns()
if err != nil {
return err
Expand Down Expand Up @@ -495,6 +562,11 @@ func (f *valueFormatting) format(
if err != nil {
return "", err
}
case avro:
formatter, err = f.avroFormatter(ctype)
if err != nil {
return "", err
}
case jsonEncoded:
formatter, err = f.jsonFormatter()
if err != nil {
Expand Down
53 changes: 51 additions & 2 deletions valueformatting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestParseValueFormatSettings(t *testing.T) {
DefaultEncoding: "HEX",
ProtocolBufferDefinitions: []string{"MyProto.proto", "MyOtherProto.proto"},
ProtocolBufferPaths: []string{"mycode/stuff", "/home/user/dev/othercode/"},
AvroSchemaPaths: map[string][]string{"namespace.Record": {"file1.avsc", "file2.avsc"}, "namespace.Banana": {"banana.avsc"}},
Columns: map[string]valueFormatColumn{
"col3": {
Encoding: "P",
Expand All @@ -48,6 +49,10 @@ func TestParseValueFormatSettings(t *testing.T) {
Encoding: "P",
Type: "hobby",
},
"col5": {
Encoding: "A",
Type: "namespace.Record",
},
},
Families: map[string]valueFormatFamily{
"family1": {
Expand Down Expand Up @@ -587,8 +592,10 @@ func TestJSONAndYAML(t *testing.T) {
func TestProtobufferAndYAML(t *testing.T) {

globalValueFormatting = newValueFormatting()
globalValueFormatting.setup(filepath.Join("testdata", "cat.yml"))

err := globalValueFormatting.setup(filepath.Join("testdata", "cat.yml"))
if err != nil {
t.Errorf("Error loading YAML:\n%v", err)
}
row := bigtable.Row{
"f1": {
bigtable.ReadItem{
Expand Down Expand Up @@ -620,6 +627,48 @@ func TestProtobufferAndYAML(t *testing.T) {
}
}

func TestAvroAndYAML(t *testing.T) {

globalValueFormatting = newValueFormatting()
err := globalValueFormatting.setup(filepath.Join("testdata", "cat.yml"))
if err != nil {
t.Errorf("Error loading YAML:\n%v", err)
}

row := bigtable.Row{
"f1": {
bigtable.ReadItem{
Row: "r1",
Column: "f1:avro",
Value: []byte{0x0a, 0x42, 0x72, 0x61, 0x76, 0x65, 0x04},
},
},
}
var out bytes.Buffer

printRow(row, &out)
got := out.String()
want := ("----------------------------------------\n" +
"r1\n" +
" f1:avro\n" +
" {\"name\":\"Brave\",\"age\":2}")

wantAlternative := ("----------------------------------------\n" +
"r1\n" +
" f1:avro\n" +
" {\"age\":2,\"name\":\"Brave\"}")

timestampsRE := regexp.MustCompile("[ ]+@ [^ \t\n]+")

stripTimestamps := func(s string) string {
return string(timestampsRE.ReplaceAll([]byte(s), []byte("")))
}
got = stripTimestamps(got)

if !(strings.Contains(got, want) || strings.Contains(got, wantAlternative)) {
t.Errorf("Formatting printed incorrectly: wanted\n%s\n,\ngot\n%s", want, got)
}
}
func TestPrintRow(t *testing.T) {
row := bigtable.Row{
"f1": {
Expand Down