Skip to content

Use arrow_scan instead of arrow_scan_dumb to enable predicate / filter pushdown#393

Closed
jonathanswenson wants to merge 1 commit intoduckdb:mainfrom
jonathanswenson:swenson/arrow-scan-smart
Closed

Use arrow_scan instead of arrow_scan_dumb to enable predicate / filter pushdown#393
jonathanswenson wants to merge 1 commit intoduckdb:mainfrom
jonathanswenson:swenson/arrow-scan-smart

Conversation

@jonathanswenson
Copy link
Contributor

Fixes #392

@jonathanswenson
Copy link
Contributor Author

Curious if opinions have changed here around writing / running tests using arrow here.

Not sure how best to test any of this without the arrow dependency.

However, when building this manually and testing against it -- I can see that a simple program seems to do what I expect when planning.

test program
package com.acme;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.duckdb.DuckDBConnection;
import org.duckdb.DuckDBDriver;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;

public class DuckDBStreamIngestTest {

    private static byte[] createStream(BufferAllocator allocator) throws Exception {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        IntVector intVector = new IntVector("id", allocator);
        VarCharVector stringVector = new VarCharVector("value", allocator);

        try (
                VectorSchemaRoot vsr = new VectorSchemaRoot(List.of(intVector, stringVector));
                ArrowStreamWriter writer = new ArrowStreamWriter(vsr, null, outputStream)
        ) {
            vsr.setRowCount(5);
            for (int i = 0; i < 5; i++) {
                intVector.setSafe(i, i);
                stringVector.setSafe(i, ("v " + Integer.valueOf(i).toString()).getBytes(StandardCharsets.UTF_8));
            }
            writer.writeBatch();
        }

        return outputStream.toByteArray();
    }

    public static void main(final String[] args) throws Exception {
        BufferAllocator allocator = new RootAllocator();
        byte[] bytes = createStream(allocator);

        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);

        ArrowStreamReader arrowReader = new ArrowStreamReader(inputStream, allocator);
        ArrowArrayStream arrowArrayStream = ArrowArrayStream.allocateNew(allocator);
        Data.exportArrayStream(allocator, arrowReader, arrowArrayStream);
        DuckDBDriver driver = new DuckDBDriver();
        try (Connection connection = driver.connect("jdbc:duckdb:", new Properties())) {
            DuckDBConnection conn = connection.unwrap(DuckDBConnection.class);
            conn.registerArrowStream("arrow_table", arrowArrayStream);

            try (
                    Statement statement = connection.createStatement();
                    ResultSet resultSet = statement.executeQuery("explain select id from arrow_table where id < 3");
            ) {
                resultSet.next();
                System.out.println(resultSet.getString(2));
            }

        }
    }
}

Before this change (using duckdb 1.3.2.0) I get the following output:

┌───────────────────────────┐
│         PROJECTION        │
│    ────────────────────   │
│             #0            │
│                           │
│          ~1 Rows          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│           FILTER          │
│    ────────────────────   │
│          (id < 3)         │
│                           │
│          ~1 Rows          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│      ARROW_SCAN_DUMB      │
│    ────────────────────   │
│         Function:         │
│      ARROW_SCAN_DUMB      │
│                           │
│          ~1 Rows          │
└───────────────────────────┘

After this change (with duckdb built from this branch) I get the following:

┌───────────────────────────┐
│        ARROW_SCAN         │
│    ────────────────────   │
│    Function: ARROW_SCAN   │
│      Projections: id      │
│       Filters: id<3       │
│                           │
│           ~1 row          │
└───────────────────────────┘

However, it looks like it is not actually applying the filter... likely missing something here.

@jonathanswenson
Copy link
Contributor Author

Closing for now while I figure it out what I'm missing.

@jonathanswenson
Copy link
Contributor Author

Turns out this is lot more complicated than I thought 🤦🏻

Might be possible, but I was definitely missing the nuance of how the python implementation worked. Likely the implementation would have to mimic what is happening to create an arrow_scanner taking the filters / projections into account. https://github.com/duckdb/duckdb-python/blob/main/src/duckdb_py/arrow/arrow_array_stream.cpp#L37-L66 Maybe possible with gandiva or the native arrow code, but that's definitely a bit of a challenge that I didn't anticipate (but should have).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

No predicate / filter pushdown when querying arrow streams

1 participant