Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Prerequisites:

Build as described above. Optionally choose Scala and Spark versions appropriate for your distribution via `scala.version` and `spark.version`. The defaults are the equivalent to

$ mvn -Dscala.version=2.10 -Dspark.version=1.5.0
$ mvn -Dscala.version=2.11 -Dspark.version=2.4.0

Run a netcat instance generating data (below examples presume this host is named _netcat.running.host.example.com_

Expand Down
4 changes: 2 additions & 2 deletions hbase-1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
<packaging>jar</packaging>
<name>Downstream HBase 1.y API</name>
<properties>
<scala.version>2.10</scala.version>
<spark.version>1.5.0</spark.version>
<scala.version>2.11</scala.version>
<spark.version>2.4.0</spark.version>
<hbase.version>${hbase.1.version}</hbase.version>
<hadoop.version>2.7.1</hadoop.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Changes Copyright 2016 hbase-downstreamer contributor(s).
* Changes Copyright 2016,2019 hbase-downstreamer contributor(s).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,13 +33,13 @@
*
* derived from:
* https://raw.githubusercontent.com/apache/spark/v1.5.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
* https://raw.githubusercontent.com/apache/spark/v2.4.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
*
*/

package org.hbase.downstreamer.spark;

import scala.Tuple2;
import com.google.common.collect.Lists;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -54,11 +54,7 @@
import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.streaming.Durations;
Expand All @@ -76,6 +72,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.Arrays;
import java.util.Iterator;

/**
Expand Down Expand Up @@ -214,7 +211,7 @@ public void call(Iterator<Tuple2<String, Integer>> iterator) throws IOException,

private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCountStoreInHBase <hostname> <port>");
System.exit(1);
Expand All @@ -233,34 +230,15 @@ public static void main(String[] args) {
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s,1))
.reduceByKey((i1, i2) -> i1 + i2);

final StoreCountsToHBase store = new StoreCountsToHBase(sparkConf);

wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
store.setTime(time);
rdd.foreachPartition(store);
return null;
}
wordCounts.foreachRDD((rdd, time) -> {
store.setTime(time);
rdd.foreachPartition(store);
});

ssc.start();
Expand Down
8 changes: 4 additions & 4 deletions hbase-2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
<properties>
<hbase.version>${hbase.2.version}</hbase.version>
<hadoop.version>2.8.5</hadoop.version>
<!-- These should match default from HBase 2 release -->
<scala.version>2.10.4</scala.version>
<scala.binary>2.10</scala.binary>
<spark.version>1.6.0</spark.version>
<!-- These should match default from HBase Connectors release -->
<scala.version>2.11.12</scala.version>
<scala.binary>2.11</scala.binary>
<spark.version>2.4.0</spark.version>
</properties>
<dependencies>
<!--START OF TEST SCOPE-->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Changes Copyright 2016 hbase-downstreamer contributor(s).
* Changes Copyright 2016,2019 hbase-downstreamer contributor(s).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,13 +33,13 @@
*
* derived from:
* https://raw.githubusercontent.com/apache/spark/v1.5.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
* https://raw.githubusercontent.com/apache/spark/v2.4.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
*
*/

package org.hbase.downstreamer.spark;

import scala.Tuple2;
import com.google.common.collect.Lists;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -54,9 +54,6 @@
import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.StorageLevels;
Expand All @@ -76,6 +73,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.Arrays;
import java.util.Iterator;

/**
Expand Down Expand Up @@ -214,7 +212,7 @@ public void call(Iterator<Tuple2<String, Integer>> iterator) throws IOException,

private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCountStoreInHBase <hostname> <port>");
System.exit(1);
Expand All @@ -233,34 +231,15 @@ public static void main(String[] args) {
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s,1))
.reduceByKey((i1, i2) -> i1 + i2);

final StoreCountsToHBase store = new StoreCountsToHBase(sparkConf);

wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
store.setTime(time);
rdd.foreachPartition(store);
return null;
}
wordCounts.foreachRDD((rdd, time) -> {
store.setTime(time);
rdd.foreachPartition(store);
});

ssc.start();
Expand Down