11package co .clflushopt .glint ;
22
33import java .io .FileNotFoundException ;
4+ import java .util .Arrays ;
45import java .util .Iterator ;
6+ import java .util .List ;
7+ import java .util .Optional ;
58
69import org .apache .arrow .vector .types .pojo .ArrowType ;
710
11+ import co .clflushopt .glint .core .CsvReaderOptions ;
812import co .clflushopt .glint .core .ExecutionContext ;
913import co .clflushopt .glint .dataframe .DataFrame ;
1014import co .clflushopt .glint .query .logical .expr .AggregateExpr ;
1317import co .clflushopt .glint .query .logical .expr .LogicalExpr ;
1418import co .clflushopt .glint .query .logical .plan .LogicalPlan ;
1519import co .clflushopt .glint .query .optimizer .QueryOptimizer ;
20+ import co .clflushopt .glint .types .ArrowTypes ;
21+ import co .clflushopt .glint .types .Field ;
1622import co .clflushopt .glint .types .RecordBatch ;
23+ import co .clflushopt .glint .types .Schema ;
1724
1825/**
1926 * Hello world!
@@ -35,9 +42,32 @@ public static void nycTripsBenchmark(String[] args) throws FileNotFoundException
3542
3643 long startTime = System .currentTimeMillis ();
3744 try {
38-
45+ // Define the schema for NYC Taxi dataset
46+ Schema schema = new Schema (Arrays .asList (new Field ("VendorID" , ArrowTypes .Int32Type ),
47+ new Field ("tpep_pickup_datetime" , ArrowTypes .StringType ), // Could be Timestamp
48+ new Field ("tpep_dropoff_datetime" , ArrowTypes .StringType ), // Could be Timestamp
49+ new Field ("passenger_count" , ArrowTypes .Int32Type ),
50+ new Field ("trip_distance" , ArrowTypes .DoubleType ),
51+ new Field ("pickup_longitude" , ArrowTypes .DoubleType ),
52+ new Field ("pickup_latitude" , ArrowTypes .DoubleType ),
53+ new Field ("RatecodeID" , ArrowTypes .Int32Type ),
54+ new Field ("store_and_fwd_flag" , ArrowTypes .StringType ),
55+ new Field ("dropoff_longitude" , ArrowTypes .DoubleType ),
56+ new Field ("dropoff_latitude" , ArrowTypes .DoubleType ),
57+ new Field ("payment_type" , ArrowTypes .Int32Type ),
58+ new Field ("fare_amount" , ArrowTypes .DoubleType ),
59+ new Field ("extra" , ArrowTypes .DoubleType ),
60+ new Field ("mta_tax" , ArrowTypes .DoubleType ),
61+ new Field ("tip_amount" , ArrowTypes .DoubleType ),
62+ new Field ("tolls_amount" , ArrowTypes .DoubleType ),
63+ new Field ("improvement_surcharge" , ArrowTypes .DoubleType ),
64+ new Field ("total_amount" , ArrowTypes .DoubleType )));
3965 // Create DataFrame and apply transformations
40- DataFrame df = ctx .readParquet ("./datasets/yellow_tripdata_2019-01.parquet" , null );
66+ DataFrame df = ctx
67+ .readCsv ("./datasets/yellow_tripdata_example.csv" , Optional .of (schema ),
68+ CsvReaderOptions .builder ().delimiter (',' ).hasHeader (true ).build ())
69+ .aggregate (List .of (col ("passenger_count" )),
70+ List .of (max (cast (col ("fare_amount" ), ArrowTypes .FloatType ))));
4171
4272 System .out .println ("Logical Plan:\t " + LogicalPlan .format (df .getLogicalPlan ()));
4373 System .out .println ("Schema:\t " + df .getSchema ());
0 commit comments