diff options
Diffstat (limited to 'streaming')
16 files changed, 29 insertions, 23 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index ebff9bdb51..83a43d15cb 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -10,7 +10,7 @@ import java.io.{InputStream, ObjectStreamClass, ObjectInputStream, ObjectOutputS class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Serializable { val master = ssc.sc.master - val framework = ssc.sc.frameworkName + val framework = ssc.sc.jobName val sparkHome = ssc.sc.sparkHome val jars = ssc.sc.jars val graph = ssc.graph diff --git a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala index 5522e2ee21..61d088eddb 100644 --- a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala @@ -1,6 +1,7 @@ package spark.streaming -import spark.{CoGroupedRDD, RDD, Partitioner} +import spark.{RDD, Partitioner} +import spark.rdd.CoGroupedRDD class CoGroupedDStream[K : ClassManifest]( parents: Seq[DStream[(_, _)]], diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 4bc063719c..12d7ba97ea 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._ import spark._ import spark.SparkContext._ +import spark.rdd._ import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer @@ -82,7 +83,7 @@ extends Serializable with Logging { // Set caching level for the RDDs created by this DStream def persist(newLevel: StorageLevel): DStream[T] = persist(newLevel, StorageLevel.NONE, null) - def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_DESER) + def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY) // Turn on the default caching level for this RDD def cache(): DStream[T] = persist() diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala index 69d3504c72..9d7361097b 100644 --- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala @@ -1,7 +1,7 @@ package spark.streaming import spark.RDD -import spark.UnionRDD +import spark.rdd.UnionRDD import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala index 5669d7fedf..f3f4c3ab13 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala @@ -1,6 +1,9 @@ package spark.streaming -import spark.{Logging, SparkEnv, RDD, BlockRDD} +import scala.collection.mutable.ArrayBuffer + +import spark.{Logging, SparkEnv, RDD} +import spark.rdd.BlockRDD import spark.storage.StorageLevel import java.nio.ByteBuffer @@ -107,7 +110,8 @@ abstract class NetworkReceiver[T: ClassManifest](streamId: Int) extends Serializ * This method pushes a block (as iterator of values) into the block manager. */ protected def pushBlock(blockId: String, iterator: Iterator[T], level: StorageLevel) { - env.blockManager.put(blockId, iterator, level) + val buffer = new ArrayBuffer[T] ++ iterator + env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level) actor ! ReportBlock(blockId) } diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala index b794159b09..bb86e51932 100644 --- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala @@ -1,7 +1,7 @@ package spark.streaming import spark.RDD -import spark.UnionRDD +import spark.rdd.UnionRDD import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index fcf57aced7..1c57d5f855 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -3,8 +3,8 @@ package spark.streaming import spark.streaming.StreamingContext._ import spark.RDD -import spark.UnionRDD -import spark.CoGroupedRDD +import spark.rdd.UnionRDD +import spark.rdd.CoGroupedRDD import spark.Partitioner import spark.SparkContext._ import spark.storage.StorageLevel diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala index 3ba8fb45fb..086752ac55 100644 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala @@ -1,9 +1,9 @@ package spark.streaming import spark.RDD -import spark.BlockRDD +import spark.rdd.BlockRDD import spark.Partitioner -import spark.MapPartitionsRDD +import spark.rdd.MapPartitionsRDD import spark.SparkContext._ import spark.storage.StorageLevel diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index e124b8cfa0..7c7b3afe47 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -89,7 +89,7 @@ class StreamingContext ( def networkTextStream( hostname: String, port: Int, - storageLevel: StorageLevel = StorageLevel.DISK_AND_MEMORY_2 + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[String] = { networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } @@ -108,7 +108,7 @@ class StreamingContext ( def rawNetworkStream[T: ClassManifest]( hostname: String, port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_2 + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 ): DStream[T] = { val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) graph.addInputStream(inputStream) diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala index b90e22351b..ce89a3f99b 100644 --- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala @@ -1,7 +1,7 @@ package spark.streaming import spark.RDD -import spark.UnionRDD +import spark.rdd.UnionRDD class WindowedDStream[T: ClassManifest]( diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 9d1b0b9eb4..57fd10f0a5 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -41,7 +41,7 @@ object TopKWordCountRaw { val windowedCounts = union.mapPartitions(splitAndCountPartitions) .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) - windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, + windowedCounts.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs)) //windowedCounts.print() // TODO: something else? diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index 8390f4af94..0d2e62b955 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -100,8 +100,8 @@ object WordCount2 { val windowedCounts = sentences .mapPartitions(splitAndCountPartitions) .reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt) - windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, - StorageLevel.MEMORY_ONLY_DESER_2, + windowedCounts.persist(StorageLevel.MEMORY_ONLY, + StorageLevel.MEMORY_ONLY_2, //new StorageLevel(false, true, true, 3), Milliseconds(chkptMillis.toLong)) windowedCounts.foreachRDD(r => println("Element count: " + r.count())) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index d8a0664d7d..abfd12890f 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -41,7 +41,7 @@ object WordCountRaw { val windowedCounts = union.mapPartitions(splitAndCountPartitions) .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) - windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, + windowedCounts.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs)) //windowedCounts.print() // TODO: something else? windowedCounts.foreachRDD(r => println("Element count: " + r.count())) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala index fc7567322b..9d44da2b11 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala @@ -57,10 +57,10 @@ object WordMax2 { val windowedCounts = sentences .mapPartitions(splitAndCountPartitions) .reduceByKey(add _, reduceTasks.toInt) - .persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, + .persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong)) .reduceByKeyAndWindow(max _, Seconds(10), batchDuration, reduceTasks.toInt) - //.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, + //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, // Milliseconds(chkptMillis.toLong)) windowedCounts.foreachRDD(r => println("Element count: " + r.count())) diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala index d00ae9cbca..80ad924dd8 100644 --- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala +++ b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala @@ -171,11 +171,11 @@ extends Thread with Logging { logInfo("Pushing block") val startTime = System.currentTimeMillis - val bytes = blockManager.dataSerialize(block.data.toIterator) + val bytes = blockManager.dataSerialize("rdd_", block.data.toIterator) // TODO: Will this be an RDD block? val finishTime = System.currentTimeMillis logInfo(block + " serialization delay is " + (finishTime - startTime) / 1000.0 + " s") - blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_2) + blockManager.putBytes(block.id.toString, bytes, StorageLevel.MEMORY_AND_DISK_SER_2) /*blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_DESER_2)*/ /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY_DESER)*/ /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY)*/ diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index fcf5d22f5c..6f6b18a790 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -22,7 +22,7 @@ class InputStreamsSuite extends TestSuiteBase { // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) - val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.DISK_AND_MEMORY) + val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) ssc.registerOutputStream(outputStream) |