aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-11-25 14:16:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-25 14:16:27 -0800
commit8838ad7c135a585cde015dc38b5cb23314502dd9 (patch)
treef70a5e123e11a07d88e6fdf4e1bdbaffc0b6a989 /streaming
parentbf1a6aaac577757a293a573fe8eae9669697310a (diff)
downloadspark-8838ad7c135a585cde015dc38b5cb23314502dd9.tar.gz
spark-8838ad7c135a585cde015dc38b5cb23314502dd9.tar.bz2
spark-8838ad7c135a585cde015dc38b5cb23314502dd9.zip
[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles
Solves two JIRAs in one shot - Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for checkpoints - Makes the default configuration object used saveAsNewAPIHadoopFiles be the Spark's hadoop configuration Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #3457 from tdas/savefiles-fix and squashes the following commits: bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles b382ea9 [Tathagata Das] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala30
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala56
2 files changed, 70 insertions, 16 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 3f03f42270..98539e06b4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -17,20 +17,17 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.StreamingContext._
-
-import org.apache.spark.{Partitioner, HashPartitioner}
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.streaming.{Time, Duration}
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+
+import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Duration, Time}
+import org.apache.spark.streaming.StreamingContext._
/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
@@ -671,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
- conf: JobConf = new JobConf
+ conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
) {
+ // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
+ val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
- rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+ rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
}
self.foreachRDD(saveFunc)
}
@@ -702,11 +701,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- conf: Configuration = new Configuration
+ conf: Configuration = ssc.sparkContext.hadoopConfiguration
) {
+ // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
+ val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
- rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+ rdd.saveAsNewAPIHadoopFile(
+ file, keyClass, valueClass, outputFormatClass, serializableConf.value)
}
self.foreachRDD(saveFunc)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 77ff1ca780..c97998add8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -22,9 +22,14 @@ import java.nio.charset.Charset
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
+
import com.google.common.io.Files
-import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{IntWritable, Text}
+import org.apache.hadoop.mapred.TextOutputFormat
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
@@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}
+ test("recovery with saveAsHadoopFiles operation") {
+ val tempDir = Files.createTempDir()
+ try {
+ testCheckpointedOperation(
+ Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
+ (s: DStream[String]) => {
+ val output = s.map(x => (x, 1)).reduceByKey(_ + _)
+ output.saveAsHadoopFiles(
+ tempDir.toURI.toString,
+ "result",
+ classOf[Text],
+ classOf[IntWritable],
+ classOf[TextOutputFormat[Text, IntWritable]])
+ output
+ },
+ Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ 3
+ )
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
+ test("recovery with saveAsNewAPIHadoopFiles operation") {
+ val tempDir = Files.createTempDir()
+ try {
+ testCheckpointedOperation(
+ Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
+ (s: DStream[String]) => {
+ val output = s.map(x => (x, 1)).reduceByKey(_ + _)
+ output.saveAsNewAPIHadoopFiles(
+ tempDir.toURI.toString,
+ "result",
+ classOf[Text],
+ classOf[IntWritable],
+ classOf[NewTextOutputFormat[Text, IntWritable]])
+ output
+ },
+ Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ 3
+ )
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
// This tests whether the StateDStream's RDD checkpoints works correctly such
// that the system can recover from a master failure. This assumes as reliable,
@@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase {
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+ val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
+ dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
+ }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output.map(_.flatten)
}
}