aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-02-16 19:32:31 +0000
committerSean Owen <sowen@cloudera.com>2015-02-16 19:32:31 +0000
commit8e25373ce72061d3b6a353259ec627606afa4a5f (patch)
tree11ccef85d99cd31fb3bfd5d526eef0ce2629c3cd /streaming
parent9baac56ccd57d3890a9b6439d4e13bbe9381822b (diff)
downloadspark-8e25373ce72061d3b6a353259ec627606afa4a5f.tar.gz
spark-8e25373ce72061d3b6a353259ec627606afa4a5f.tar.bz2
spark-8e25373ce72061d3b6a353259ec627606afa4a5f.zip
SPARK-5795 [STREAMING] api.java.JavaPairDStream.saveAsNewAPIHadoopFiles may not friendly to java
Revise JavaPairDStream API declaration on saveAs Hadoop methods, to allow it to be called directly as intended. CC tdas for review Author: Sean Owen <sowen@cloudera.com> Closes #4608 from srowen/SPARK-5795 and squashes the following commits: 36f1ead [Sean Owen] Add code that shows compile problem and fix 036bd27 [Sean Owen] Revise JavaPairDStream API declaration on saveAs Hadoop methods, to allow it to be called directly as intended.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala20
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java18
2 files changed, 28 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index de124cf40e..bd01789b61 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -726,7 +726,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
+ def saveAsHadoopFiles(prefix: String, suffix: String) {
dstream.saveAsHadoopFiles(prefix, suffix)
}
@@ -734,12 +734,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsHadoopFiles(
+ def saveAsHadoopFiles[F <: OutputFormat[_, _]](
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
+ outputFormatClass: Class[F]) {
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
@@ -747,12 +747,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsHadoopFiles(
+ def saveAsHadoopFiles[F <: OutputFormat[_, _]](
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ outputFormatClass: Class[F],
conf: JobConf) {
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
@@ -761,7 +761,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
+ def saveAsNewAPIHadoopFiles(prefix: String, suffix: String) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
}
@@ -769,12 +769,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsNewAPIHadoopFiles(
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]](
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+ outputFormatClass: Class[F]) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
@@ -782,12 +782,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsNewAPIHadoopFiles(
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]](
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ outputFormatClass: Class[F],
conf: Configuration = new Configuration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2df8cf6a8a..57302ff407 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1828,4 +1828,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
return expected;
}
+
+ // SPARK-5795: no logic assertions, just testing that intended API invocations compile
+ private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) {
+ pds.saveAsNewAPIHadoopFiles(
+ "", "", LongWritable.class, Text.class,
+ org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+ pds.saveAsHadoopFiles(
+ "", "", LongWritable.class, Text.class,
+ org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+ // Checks that a previous common workaround for this API still compiles
+ pds.saveAsNewAPIHadoopFiles(
+ "", "", LongWritable.class, Text.class,
+ (Class) org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+ pds.saveAsHadoopFiles(
+ "", "", LongWritable.class, Text.class,
+ (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+ }
+
}