Conversation
AndreaBozzo
left a comment
There was a problem hiding this comment.
while i'm very in favor of the idea behind this, there a few concerns about the implementation
| mut state: AvroReaderState<R>, | ||
| ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static { | ||
| async_stream::try_stream! { | ||
| if state.meta.range.start >= state.meta.range.end { |
There was a problem hiding this comment.
try_stream! is sequential? There's no actual prefetching or concurrent work happening, did i miss something?
There was a problem hiding this comment.
This implementation is entirely pull-based. The reader API exposes the stream, which can be driven by a dedicated async task (with the granularity of a record batch at a time) if beneficial to the application.
There is a comment about not being able to use the inner spawn helper because of other constraints on the trait design.
(Responding because I had an input on this change.)
There was a problem hiding this comment.
In my end-to-end benchmark, prefetching is done by the object-store implementation, specifically in the call to get_opts
|
@jecsand838 can you please take a look at this? |
|
Can we also please file a ticket explaining this need / let us discuss this feature a bit? Note we have also been trying to separate the IO from the decoding in Parquet -- see https://docs.rs/parquet/58.0.0/parquet/arrow/push_decoder/struct.ParquetPushDecoder.html Perhaps we could move Avro to that model too rather than implementing the async stuff first |
Which issue does this PR close?
Rationale for this change
Implement an async streamed reader for avro, this is similar to how datafusion handles json and csv scanning.
There are two main advantages:
What changes are included in this PR?
Change the avro reader implementation to use async streams.
Are these changes tested?
Yes, added tests.
Are there any user-facing changes?
Users can now implement get_stream as part of the
AsyncFileReadertrait