-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathTaxiRidesExample.java
More file actions
80 lines (65 loc) · 3.31 KB
/
TaxiRidesExample.java
File metadata and controls
80 lines (65 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package streaming;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ToString;
import org.joda.time.Duration;
import util.AwsOptionsParser;
import java.util.UUID;
/**
* An example is streaming taxi rides from PubSub into Snowflake.
*
* Check main README for more information.
*/
public class TaxiRidesExample {
private static final String PUBSUB_TAX_RIDES = "projects/pubsub-public-data/topics/taxirides-realtime";
public static void main(String[] args) {
TaxiRidesOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(TaxiRidesOptions.class);
AwsOptionsParser.format(options);
Pipeline p = Pipeline.create(options);
SnowflakeIO.DataSourceConfiguration dataSourceConfiguration = createSnowflakeConfiguration(options);
p.apply("Reading from PubSub",
PubsubIO.readStrings()
.fromTopic(PUBSUB_TAX_RIDES))
.apply(ToString.elements())
.apply(
"Writing into Snowflake",
SnowflakeIO.<String>write()
.withStagingBucketName(options.getStagingBucketName())
.withStorageIntegrationName(options.getStorageIntegrationName())
.withDataSourceConfiguration(dataSourceConfiguration)
.withUserDataMapper(getStreamingCsvMapper())
.withSnowPipe(options.getSnowPipe())
.withFileNameTemplate(UUID.randomUUID().toString())
.withFlushTimeLimit(Duration.millis(3000))
.withFlushRowLimit(100)
.withQuotationMark("")
.withShardsNumber(1));
p.run();
}
public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(TaxiRidesOptions options) {
return SnowflakeIO.DataSourceConfiguration.create()
.withKeyPairRawAuth(options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase())
.withKeyPairPathAuth(options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase())
.withDatabase(options.getDatabase())
.withWarehouse(options.getWarehouse())
.withServerName(options.getServerName())
.withSchema(options.getSchema());
}
public static SnowflakeIO.UserDataMapper<String> getStreamingCsvMapper() {
return (SnowflakeIO.UserDataMapper<String>)
recordLine -> {
JsonParser jsonParser = new JsonParser();
JsonObject jo = (JsonObject) jsonParser.parse(recordLine);
return new String[]{
jo.get("ride_id").getAsString(),
String.valueOf(jo.get("latitude").getAsDouble()),
String.valueOf(jo.get("longitude").getAsDouble())
};
};
}
}