From 131be5d62ef6b770de5106eb268a45bca385b599 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 14 Jan 2013 03:28:25 -0800 Subject: Fixed bug in RDD checkpointing. --- core/src/main/scala/spark/rdd/CheckpointRDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 86c63ca2f4..6f00f6ac73 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -80,12 +80,12 @@ private[spark] object CheckpointRDD extends Logging { val serializer = SparkEnv.get.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) serializeStream.writeAll(iterator) - fileOutputStream.close() + serializeStream.close() if (!fs.rename(tempOutputPath, finalOutputPath)) { if (!fs.delete(finalOutputPath, true)) { throw new IOException("Checkpoint failed: failed to delete earlier output of task " - + context.attemptId); + + context.attemptId) } if (!fs.rename(tempOutputPath, finalOutputPath)) { throw new IOException("Checkpoint failed: failed to save output of task: " @@ -119,7 +119,7 @@ private[spark] object CheckpointRDD extends Logging { val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") val fs = path.getFileSystem(new Configuration()) - sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 10) _) + sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.splits.length == rdd.splits.length, "Number of splits is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of splits not the same") -- cgit v1.2.3 From 1638fcb0dce296da22ffc90127d5148a8fab745e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 14 Jan 2013 17:18:39 -0800 Subject: Fixed updateStateByKey to work with primitive types. --- .../src/main/scala/spark/streaming/PairDStreamFunctions.scala | 8 ++++---- .../src/main/scala/spark/streaming/dstream/StateDStream.scala | 2 +- .../src/test/scala/spark/streaming/BasicOperationsSuite.scala | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 3952457339..3dbef69868 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -377,7 +377,7 @@ extends Serializable { * corresponding state key-value pair will be eliminated. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) @@ -392,7 +392,7 @@ extends Serializable { * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int ): DStream[(K, S)] = { @@ -408,7 +408,7 @@ extends Serializable { * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] = { @@ -431,7 +431,7 @@ extends Serializable { * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index a1ec2f5454..b4506c74aa 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -7,7 +7,7 @@ import spark.storage.StorageLevel import spark.streaming.{Duration, Time, DStream} private[streaming] -class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( +class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index f9e03c607d..f73f9b1823 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -151,10 +151,10 @@ class BasicOperationsSuite extends TestSuiteBase { ) val updateStateOperation = (s: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) } - s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) + s.map(x => (x, 1)).updateStateByKey[Int](updateFunc) } testOperation(inputData, updateStateOperation, outputData, true) -- cgit v1.2.3