Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions parquet/file/file_writer.go
Comment thread
zeroshade marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,38 @@ func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption {
}
}

// NewParquetWriter returns a Writer that writes to the provided WriteSeeker with the given schema.
// NewParquetWriter returns a Writer that writes to the provided io.Writer with the given schema.
//
// If props is nil, then the default Writer Properties will be used. If the key value metadata is not nil,
// it will be added to the file.
//
// This constructor panics with the literal string "failed to write magic
// number" if the initial write of the parquet magic header to the
// underlying sink fails. The behavior is preserved for backward
// compatibility with callers that string-match the panic value in a
// recover() block; new code should prefer [NewParquetWriterWithError],
// which returns the failure as an error instead.
func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *Writer {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we deprecate this as well?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to not deprecate this unless we provide a similar short-named function, The new NewParquetWriterWithError function is a bit long to be the canonical function to use. Given the only difference is just having an error return instead of a panic, we could either rename the new function or keep both as relevant.

fw, err := NewParquetWriterWithError(w, sc, opts...)
if err != nil {
// Preserve the historical panic value verbatim so any consumer
// performing a string-match in a recover() block continues to work.
panic("failed to write magic number")
}
return fw
}

// NewParquetWriterWithError returns a Writer that writes to the provided
// io.Writer with the given schema.
//
// If props is nil, then the default Writer Properties will be used. If the
// key value metadata is not nil, it will be added to the file.
//
// An error is returned if the initial write of the parquet magic header to
// the underlying sink fails or short-writes, which can happen when the
// sink is a flaky or network-attached writer (for example, a cloud-storage
// upload writer).
func NewParquetWriterWithError(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) (*Writer, error) {
config := &writerConfig{}
for _, o := range opts {
o(config)
Expand All @@ -87,8 +114,10 @@ func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *W
}

fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, config.keyValueMetadata)
fw.startFile()
return fw
if err := fw.startFile(); err != nil {
return nil, err
}
return fw, nil
}

// NumColumns returns the number of columns to write as defined by the schema.
Expand Down Expand Up @@ -167,7 +196,7 @@ func (fw *Writer) appendRowGroup(buffered bool) *rowGroupWriter {
return fw.rowGroupWriter
}

func (fw *Writer) startFile() {
func (fw *Writer) startFile() error {
encryptionProps := fw.props.FileEncryptionProperties()
magic := magicBytes
if encryptionProps != nil {
Expand Down Expand Up @@ -199,8 +228,11 @@ func (fw *Writer) startFile() {
}

n, err := fw.sink.Write(magic)
if n != 4 || err != nil {
panic("failed to write magic number")
if err != nil {
return fmt.Errorf("parquet: failed to write magic number: %w", err)
}
if n != len(magic) {
return fmt.Errorf("parquet: short write of magic number: wrote %d of %d bytes", n, len(magic))
}

if fw.props.PageIndexEnabled() {
Expand All @@ -209,6 +241,7 @@ func (fw *Writer) startFile() {
Encryptor: fw.fileEncryptor,
}
}
return nil
}

func (fw *Writer) writePageIndex() {
Expand Down
98 changes: 98 additions & 0 deletions parquet/file/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package file_test

import (
"bytes"
"errors"
"fmt"
"io"
"math"
"reflect"
"slices"
Expand Down Expand Up @@ -1324,3 +1326,99 @@ func TestBufferedStreamDictionaryCompressed(t *testing.T) {
assert.Equal(t, int32(i), readValues[i])
}
}

// flakyMagicSink models a transient I/O failure on the first Write call,
// the failure mode reported in apache/arrow-go#820 for cloud-storage upload
// writers. Set firstErr to fail with that error on the first Write, or set
// short to true to return n=len(p)-1, nil on the first Write. All subsequent
// writes succeed and forward to the embedded buffer.
type flakyMagicSink struct {
buf bytes.Buffer
writes int
firstErr error
short bool
}

func (f *flakyMagicSink) Write(p []byte) (int, error) {
f.writes++
if f.writes == 1 {
if f.firstErr != nil {
return 0, f.firstErr
}
if f.short {
return len(p) - 1, nil
}
}
return f.buf.Write(p)
}

func newSingleColumnSchema(t *testing.T) *schema.GroupNode {
t.Helper()
fields := schema.FieldList{schema.NewInt32Node("col", parquet.Repetitions.Required, 1)}
sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, 0)
require.NoError(t, err)
return sc
}

func TestNewParquetWriterWithError_Success(t *testing.T) {
var buf bytes.Buffer
writer, err := file.NewParquetWriterWithError(&buf, newSingleColumnSchema(t))
require.NoError(t, err)
require.NotNil(t, writer)
require.NoError(t, writer.Close())
}

func TestNewParquetWriterWithError_FirstWriteFails(t *testing.T) {
sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}
writer, err := file.NewParquetWriterWithError(sink, newSingleColumnSchema(t))
require.Error(t, err)
require.Nil(t, writer)
require.True(t, errors.Is(err, io.ErrUnexpectedEOF),
"expected returned error to wrap io.ErrUnexpectedEOF, got %v", err)
}

func TestNewParquetWriterWithError_FirstWriteShortWrites(t *testing.T) {
sink := &flakyMagicSink{short: true}
writer, err := file.NewParquetWriterWithError(sink, newSingleColumnSchema(t))
require.Error(t, err)
require.Nil(t, writer)
require.Contains(t, err.Error(), "short write of magic number")
}

func TestNewParquetWriter_PreservesPanicMessage(t *testing.T) {
sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}

defer func() {
r := recover()
require.NotNil(t, r, "NewParquetWriter should panic on first-write failure")
msg, ok := r.(string)
require.True(t, ok, "panic value should remain a string for back-compat (got %T)", r)
require.Equal(t, "failed to write magic number", msg)
}()

_ = file.NewParquetWriter(sink, newSingleColumnSchema(t))
t.Fatalf("expected NewParquetWriter to panic, but it returned normally")
}

func TestPqarrowNewFileWriter_PropagatesInitError(t *testing.T) {
arrowSchema := arrow.NewSchema([]arrow.Field{{Name: "f", Type: arrow.PrimitiveTypes.Int32}}, nil)
sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}

writer, err := pqarrow.NewFileWriter(arrowSchema, sink,
parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
require.Error(t, err)
require.Nil(t, writer)
require.True(t, errors.Is(err, io.ErrUnexpectedEOF),
"expected returned error to wrap io.ErrUnexpectedEOF, got %v", err)
}

func TestPqarrowNewFileWriter_PropagatesShortWrite(t *testing.T) {
arrowSchema := arrow.NewSchema([]arrow.Field{{Name: "f", Type: arrow.PrimitiveTypes.Int32}}, nil)
sink := &flakyMagicSink{short: true}

writer, err := pqarrow.NewFileWriter(arrowSchema, sink,
parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
require.Error(t, err)
require.Nil(t, writer)
require.Contains(t, err.Error(), "short write of magic number")
}
5 changes: 4 additions & 1 deletion parquet/pqarrow/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterPr
}

schemaNode := pqschema.Root()
baseWriter := file.NewParquetWriter(w, schemaNode, file.WithWriterProps(props), file.WithWriteMetadata(meta))
baseWriter, err := file.NewParquetWriterWithError(w, schemaNode, file.WithWriterProps(props), file.WithWriteMetadata(meta))
if err != nil {
return nil, err
}

manifest, err := NewSchemaManifest(pqschema, nil, &ArrowReadProperties{})
if err != nil {
Expand Down