aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-14 23:06:40 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-14 23:06:40 -0800
commiteded21925ab549330d0337138fa1f81ae192e3e9 (patch)
treec3cd6105aed3d77a8ae2c4dc64f2696aef9801ed
parent82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1 (diff)
parent1638fcb0dce296da22ffc90127d5148a8fab745e (diff)
downloadspark-eded21925ab549330d0337138fa1f81ae192e3e9.tar.gz
spark-eded21925ab549330d0337138fa1f81ae192e3e9.tar.bz2
spark-eded21925ab549330d0337138fa1f81ae192e3e9.zip
Merge pull request #375 from tdas/streaming
Important bug fixes
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala6
4 files changed, 11 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 86c63ca2f4..6f00f6ac73 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -80,12 +80,12 @@ private[spark] object CheckpointRDD extends Logging {
val serializer = SparkEnv.get.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
- fileOutputStream.close()
+ serializeStream.close()
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
throw new IOException("Checkpoint failed: failed to delete earlier output of task "
- + context.attemptId);
+ + context.attemptId)
}
if (!fs.rename(tempOutputPath, finalOutputPath)) {
throw new IOException("Checkpoint failed: failed to save output of task: "
@@ -119,7 +119,7 @@ private[spark] object CheckpointRDD extends Logging {
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val fs = path.getFileSystem(new Configuration())
- sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 10) _)
+ sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.splits.length == rdd.splits.length, "Number of splits is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of splits not the same")
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 3952457339..3dbef69868 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -377,7 +377,7 @@ extends Serializable {
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
- def updateStateByKey[S <: AnyRef : ClassManifest](
+ def updateStateByKey[S: ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
@@ -392,7 +392,7 @@ extends Serializable {
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S <: AnyRef : ClassManifest](
+ def updateStateByKey[S: ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
): DStream[(K, S)] = {
@@ -408,7 +408,7 @@ extends Serializable {
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S <: AnyRef : ClassManifest](
+ def updateStateByKey[S: ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = {
@@ -431,7 +431,7 @@ extends Serializable {
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @tparam S State type
*/
- def updateStateByKey[S <: AnyRef : ClassManifest](
+ def updateStateByKey[S: ClassManifest](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index a1ec2f5454..b4506c74aa 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -7,7 +7,7 @@ import spark.storage.StorageLevel
import spark.streaming.{Duration, Time, DStream}
private[streaming]
-class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
+class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index f9e03c607d..f73f9b1823 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -151,10 +151,10 @@ class BasicOperationsSuite extends TestSuiteBase {
)
val updateStateOperation = (s: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
}
- s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
+ s.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
}
testOperation(inputData, updateStateOperation, outputData, true)