-
Notifications
You must be signed in to change notification settings - Fork 120
fix(parquet): return error instead of panicking on first-write failure #824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,11 +65,38 @@ func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption { | |
| } | ||
| } | ||
|
|
||
| // NewParquetWriter returns a Writer that writes to the provided WriteSeeker with the given schema. | ||
| // NewParquetWriter returns a Writer that writes to the provided io.Writer with the given schema. | ||
| // | ||
| // If props is nil, then the default Writer Properties will be used. If the key value metadata is not nil, | ||
| // it will be added to the file. | ||
| // | ||
| // This constructor panics with the literal string "failed to write magic | ||
| // number" if the initial write of the parquet magic header to the | ||
| // underlying sink fails. The behavior is preserved for backward | ||
| // compatibility with callers that string-match the panic value in a | ||
| // recover() block; new code should prefer [NewParquetWriterWithError], | ||
| // which returns the failure as an error instead. | ||
| func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *Writer { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we deprecate this as well?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My preference would be to not deprecate this unless we provide a similar short-named function, The new |
||
| fw, err := NewParquetWriterWithError(w, sc, opts...) | ||
| if err != nil { | ||
| // Preserve the historical panic value verbatim so any consumer | ||
| // performing a string-match in a recover() block continues to work. | ||
| panic("failed to write magic number") | ||
| } | ||
| return fw | ||
| } | ||
|
|
||
| // NewParquetWriterWithError returns a Writer that writes to the provided | ||
| // io.Writer with the given schema. | ||
| // | ||
| // If props is nil, then the default Writer Properties will be used. If the | ||
| // key value metadata is not nil, it will be added to the file. | ||
| // | ||
| // An error is returned if the initial write of the parquet magic header to | ||
| // the underlying sink fails or short-writes, which can happen when the | ||
| // sink is a flaky or network-attached writer (for example, a cloud-storage | ||
| // upload writer). | ||
| func NewParquetWriterWithError(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) (*Writer, error) { | ||
| config := &writerConfig{} | ||
| for _, o := range opts { | ||
| o(config) | ||
|
|
@@ -87,8 +114,10 @@ func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *W | |
| } | ||
|
|
||
| fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, config.keyValueMetadata) | ||
| fw.startFile() | ||
| return fw | ||
| if err := fw.startFile(); err != nil { | ||
| return nil, err | ||
| } | ||
| return fw, nil | ||
| } | ||
|
|
||
| // NumColumns returns the number of columns to write as defined by the schema. | ||
|
|
@@ -167,7 +196,7 @@ func (fw *Writer) appendRowGroup(buffered bool) *rowGroupWriter { | |
| return fw.rowGroupWriter | ||
| } | ||
|
|
||
| func (fw *Writer) startFile() { | ||
| func (fw *Writer) startFile() error { | ||
| encryptionProps := fw.props.FileEncryptionProperties() | ||
| magic := magicBytes | ||
| if encryptionProps != nil { | ||
|
|
@@ -199,8 +228,11 @@ func (fw *Writer) startFile() { | |
| } | ||
|
|
||
| n, err := fw.sink.Write(magic) | ||
| if n != 4 || err != nil { | ||
| panic("failed to write magic number") | ||
| if err != nil { | ||
| return fmt.Errorf("parquet: failed to write magic number: %w", err) | ||
| } | ||
| if n != len(magic) { | ||
| return fmt.Errorf("parquet: short write of magic number: wrote %d of %d bytes", n, len(magic)) | ||
| } | ||
|
|
||
| if fw.props.PageIndexEnabled() { | ||
|
|
@@ -209,6 +241,7 @@ func (fw *Writer) startFile() { | |
| Encryptor: fw.fileEncryptor, | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (fw *Writer) writePageIndex() { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.