@@ -59,22 +59,149 @@ Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions&
5959 return std::make_unique<AvroInputStream>(file, buffer_size);
6060}
6161
62- } // namespace
62+ // Abstract base class for Avro read backends.
63+ class AvroReadBackend {
64+ public:
65+ virtual ~AvroReadBackend () = default ;
66+ virtual Result<::avro::ValidSchema> Init (
67+ std::unique_ptr<AvroInputStream> input_stream) = 0;
68+ virtual Status InitWithSchema (const ::avro::ValidSchema& file_schema,
69+ const std::optional<Split>& split) = 0;
70+ virtual void InitReadContext (const ::avro::ValidSchema& reader_schema) = 0;
71+ virtual bool HasMore () = 0;
72+ virtual Status DecodeNext (const SchemaProjection& projection, const Schema& read_schema,
73+ ::arrow::ArrayBuilder* builder) = 0;
74+ virtual bool IsPastSync (int64_t split_end) const = 0;
75+ virtual const ::avro::Metadata& GetMetadata () const = 0;
76+ virtual const ::avro::ValidSchema& GetReaderSchema () const = 0;
77+ virtual void Close () = 0;
78+ virtual bool Closed () const = 0;
79+ };
80+
81+ // Backend implementation using direct Avro decoder.
82+ class DirectDecoderBackend : public AvroReadBackend {
83+ public:
84+ Result<::avro::ValidSchema> Init (
85+ std::unique_ptr<AvroInputStream> input_stream) override {
86+ reader_ = std::make_unique<::avro::DataFileReaderBase>(std::move (input_stream));
87+ return reader_->dataSchema ();
88+ }
89+
90+ Status InitWithSchema (const ::avro::ValidSchema& file_schema,
91+ const std::optional<Split>& split) override {
92+ reader_->init (file_schema);
93+ if (split) {
94+ reader_->sync (split->offset );
95+ }
96+ return {};
97+ }
98+
99+ void InitReadContext (const ::avro::ValidSchema&) override {}
100+
101+ bool HasMore () override { return reader_->hasMore (); }
102+
103+ Status DecodeNext (const SchemaProjection& projection, const Schema& read_schema,
104+ ::arrow::ArrayBuilder* builder) override {
105+ reader_->decr ();
106+ return DecodeAvroToBuilder (GetReaderSchema ().root (), reader_->decoder (), projection,
107+ read_schema, builder, decode_context_);
108+ }
109+
110+ bool IsPastSync (int64_t split_end) const override {
111+ return reader_->pastSync (split_end);
112+ }
113+
114+ const ::avro::Metadata& GetMetadata () const override { return reader_->metadata (); }
115+
116+ const ::avro::ValidSchema& GetReaderSchema () const override {
117+ return reader_->readerSchema ();
118+ }
119+
120+ void Close () override {
121+ if (reader_) {
122+ reader_->close ();
123+ reader_.reset ();
124+ }
125+ }
126+
127+ bool Closed () const override { return reader_ == nullptr ; }
128+
129+ private:
130+ std::unique_ptr<::avro::DataFileReaderBase> reader_;
131+ // Decode context for reusing scratch buffers
132+ DecodeContext decode_context_;
133+ };
134+
135+ // Backend implementation using avro::GenericDatum.
136+ class GenericDatumBackend : public AvroReadBackend {
137+ public:
138+ Result<::avro::ValidSchema> Init (
139+ std::unique_ptr<AvroInputStream> input_stream) override {
140+ reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
141+ std::move (input_stream));
142+ return reader_->dataSchema ();
143+ }
144+
145+ Status InitWithSchema (const ::avro::ValidSchema& /* file_schema*/ ,
146+ const std::optional<Split>& split) override {
147+ if (split) {
148+ reader_->sync (split->offset );
149+ }
150+ return {};
151+ }
152+
153+ void InitReadContext (const ::avro::ValidSchema& reader_schema) override {
154+ datum_ = std::make_unique<::avro::GenericDatum>(reader_schema);
155+ }
156+
157+ bool HasMore () override {
158+ has_more_ = reader_->read (*datum_);
159+ return has_more_;
160+ }
161+
162+ Status DecodeNext (const SchemaProjection& projection, const Schema& read_schema,
163+ ::arrow::ArrayBuilder* builder) override {
164+ return AppendDatumToBuilder (GetReaderSchema ().root (), *datum_, projection,
165+ read_schema, builder);
166+ }
167+
168+ bool IsPastSync (int64_t split_end) const override {
169+ return reader_->pastSync (split_end);
170+ }
171+
172+ const ::avro::Metadata& GetMetadata () const override { return reader_->metadata (); }
173+
174+ const ::avro::ValidSchema& GetReaderSchema () const override {
175+ return reader_->readerSchema ();
176+ }
177+
178+ void Close () override {
179+ if (reader_) {
180+ reader_->close ();
181+ reader_.reset ();
182+ }
183+ }
184+
185+ bool Closed () const override { return reader_ == nullptr ; }
186+
187+ private:
188+ std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
189+ // Reusable GenericDatum for reading records
190+ std::unique_ptr<::avro::GenericDatum> datum_;
191+ // Cached result from HasMore()
192+ bool has_more_ = false ;
193+ };
63194
64195// A stateful context to keep track of the reading progress.
65196struct ReadContext {
66197 // The arrow schema to build the record batch.
67198 std::shared_ptr<::arrow::Schema> arrow_schema_;
68199 // The builder to build the record batch.
69200 std::shared_ptr<::arrow::ArrayBuilder> builder_;
70- // GenericDatum for GenericDatum-based decoding (only used if direct decoder is
71- // disabled)
72- std::unique_ptr<::avro::GenericDatum> datum_;
73- // Decode context for reusing scratch buffers (only used if direct decoder is
74- // enabled)
75- DecodeContext decode_context_;
76201};
77202
203+ } // namespace
204+
78205// TODO(gang.wu): collect basic reader metrics
79206class AvroReader ::Impl {
80207 public:
@@ -85,7 +212,6 @@ class AvroReader::Impl {
85212 }
86213
87214 batch_size_ = options.properties ->Get (ReaderProperties::kBatchSize );
88- use_direct_decoder_ = options.properties ->Get (ReaderProperties::kAvroSkipDatum );
89215 read_schema_ = options.projection ;
90216
91217 // Open the input stream and adapt to the avro interface.
@@ -94,22 +220,15 @@ class AvroReader::Impl {
94220 CreateInputStream (options,
95221 options.properties ->Get (ReaderProperties::kAvroBufferSize )));
96222
97- ::avro::ValidSchema file_schema;
98-
99- if (use_direct_decoder_) {
100- // Create base reader for direct decoder access
101- auto base_reader =
102- std::make_unique<::avro::DataFileReaderBase>(std::move (input_stream));
103- file_schema = base_reader->dataSchema ();
104- base_reader_ = std::move (base_reader);
223+ // Create the appropriate backend based on configuration
224+ if (options.properties ->Get (ReaderProperties::kAvroSkipDatum )) {
225+ backend_ = std::make_unique<DirectDecoderBackend>();
105226 } else {
106- // Create DataFileReader<GenericDatum> for GenericDatum-based decoding
107- auto datum_reader = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
108- std::move (input_stream));
109- file_schema = datum_reader->dataSchema ();
110- datum_reader_ = std::move (datum_reader);
227+ backend_ = std::make_unique<GenericDatumBackend>();
111228 }
112229
230+ ICEBERG_ASSIGN_OR_RAISE (auto file_schema, backend_->Init (std::move (input_stream)));
231+
113232 // Validate field ids in the file schema.
114233 HasIdVisitor has_id_visitor;
115234 ICEBERG_RETURN_UNEXPECTED (has_id_visitor.Visit (file_schema));
@@ -132,23 +251,13 @@ class AvroReader::Impl {
132251 }
133252
134253 // Project the read schema on top of the file schema.
135- // TODO(gangwu): support pruning source fields
136254 ICEBERG_ASSIGN_OR_RAISE (projection_, Project (*read_schema_, file_schema.root (),
137255 /* prune_source=*/ false ));
138256
139- if (use_direct_decoder_) {
140- // Initialize the base reader with the file schema
141- base_reader_->init (file_schema);
142- if (options.split ) {
143- base_reader_->sync (options.split ->offset );
144- split_end_ = options.split ->offset + options.split ->length ;
145- }
146- } else {
147- // The datum reader is already initialized during construction
148- if (options.split ) {
149- datum_reader_->sync (options.split ->offset );
150- split_end_ = options.split ->offset + options.split ->length ;
151- }
257+ ICEBERG_RETURN_UNEXPECTED (backend_->InitWithSchema (file_schema, options.split ));
258+
259+ if (options.split ) {
260+ split_end_ = options.split ->offset + options.split ->length ;
152261 }
153262
154263 return {};
@@ -163,34 +272,18 @@ class AvroReader::Impl {
163272 if (IsPastSync ()) {
164273 break ;
165274 }
166-
167- if (use_direct_decoder_) {
168- // Direct decoder: decode Avro to Arrow without GenericDatum
169- if (!base_reader_->hasMore ()) {
170- break ;
171- }
172- base_reader_->decr ();
173-
174- ICEBERG_RETURN_UNEXPECTED (DecodeAvroToBuilder (
175- GetReaderSchema ().root (), base_reader_->decoder (), projection_, *read_schema_,
176- context_->builder_ .get (), context_->decode_context_ ));
177- } else {
178- // GenericDatum-based decoding: decode via GenericDatum intermediate
179- if (!datum_reader_->read (*context_->datum_ )) {
180- break ;
181- }
182-
183- ICEBERG_RETURN_UNEXPECTED (
184- AppendDatumToBuilder (GetReaderSchema ().root (), *context_->datum_ , projection_,
185- *read_schema_, context_->builder_ .get ()));
275+ if (!backend_->HasMore ()) {
276+ break ;
186277 }
278+ ICEBERG_RETURN_UNEXPECTED (
279+ backend_->DecodeNext (projection_, *read_schema_, context_->builder_ .get ()));
187280 }
188281
189282 return ConvertBuilderToArrowArray ();
190283 }
191284
192285 Status Close () {
193- CloseReader ();
286+ backend_-> Close ();
194287 context_.reset ();
195288 return {};
196289 }
@@ -209,12 +302,11 @@ class AvroReader::Impl {
209302 }
210303
211304 Result<std::unordered_map<std::string, std::string>> Metadata () {
212- if ((use_direct_decoder_ && !base_reader_) ||
213- (!use_direct_decoder_ && !datum_reader_)) {
305+ if (backend_->Closed ()) {
214306 return Invalid (" Reader is not opened" );
215307 }
216308
217- const auto & metadata = GetReaderMetadata ();
309+ const auto & metadata = backend_-> GetMetadata ();
218310 std::unordered_map<std::string, std::string> metadata_map;
219311 metadata_map.reserve (metadata.size ());
220312
@@ -247,11 +339,7 @@ class AvroReader::Impl {
247339 builder_result.status ().message ());
248340 }
249341 context_->builder_ = builder_result.MoveValueUnsafe ();
250-
251- // Initialize GenericDatum for GenericDatum-based decoding
252- if (!use_direct_decoder_) {
253- context_->datum_ = std::make_unique<::avro::GenericDatum>(GetReaderSchema ());
254- }
342+ backend_->InitReadContext (backend_->GetReaderSchema ());
255343
256344 return {};
257345 }
@@ -281,48 +369,20 @@ class AvroReader::Impl {
281369 if (!split_end_) {
282370 return false ;
283371 }
284- return use_direct_decoder_ ? base_reader_->pastSync (split_end_.value ())
285- : datum_reader_->pastSync (split_end_.value ());
286- }
287-
288- const ::avro::Metadata& GetReaderMetadata () const {
289- return use_direct_decoder_ ? base_reader_->metadata () : datum_reader_->metadata ();
290- }
291-
292- void CloseReader () {
293- if (use_direct_decoder_) {
294- if (base_reader_) {
295- base_reader_->close ();
296- base_reader_.reset ();
297- }
298- } else {
299- if (datum_reader_) {
300- datum_reader_->close ();
301- datum_reader_.reset ();
302- }
303- }
304- }
305-
306- const ::avro::ValidSchema& GetReaderSchema () const {
307- return use_direct_decoder_ ? base_reader_->readerSchema ()
308- : datum_reader_->readerSchema ();
372+ return backend_->IsPastSync (split_end_.value ());
309373 }
310374
311375 private:
312376 // Max number of rows in the record batch to read.
313377 int64_t batch_size_{};
314- // Whether to use direct decoder (true) or GenericDatum-based decoder (false).
315- bool use_direct_decoder_{true };
316378 // The end of the split to read and used to terminate the reading.
317379 std::optional<int64_t > split_end_;
318380 // The schema to read.
319381 std::shared_ptr<::iceberg::Schema> read_schema_;
320382 // The projection result to apply to the read schema.
321383 SchemaProjection projection_;
322- // The avro reader base - provides direct access to decoder for direct decoding.
323- std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
324- // The datum reader for GenericDatum-based decoding.
325- std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
384+ // The read backend to read data into Arrow.
385+ std::unique_ptr<AvroReadBackend> backend_;
326386 // The context to keep track of the reading progress.
327387 std::unique_ptr<ReadContext> context_;
328388};
0 commit comments