aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-24 23:21:00 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-24 23:21:00 -0700
commit863a55ae42c2b9c0583b77cf37ff13bd2459f82b (patch)
treef4b18ebe461343ffb864dabb6afefcdf88dfafaf /streaming
parented71df46cddc9a4f1363b937c10bfa2a928e564c (diff)
parentf63a40fd99bf907c03cd44585fd5979bf21b304d (diff)
downloadspark-863a55ae42c2b9c0583b77cf37ff13bd2459f82b.tar.gz
spark-863a55ae42c2b9c0583b77cf37ff13bd2459f82b.tar.bz2
spark-863a55ae42c2b9c0583b77cf37ff13bd2459f82b.zip
Merge remote-tracking branch 'public/master' into dev
Conflicts: core/src/main/scala/spark/BlockStoreShuffleFetcher.scala core/src/main/scala/spark/KryoSerializer.scala core/src/main/scala/spark/MapOutputTracker.scala core/src/main/scala/spark/RDD.scala core/src/main/scala/spark/SparkContext.scala core/src/main/scala/spark/executor/Executor.scala core/src/main/scala/spark/network/Connection.scala core/src/main/scala/spark/network/ConnectionManagerTest.scala core/src/main/scala/spark/rdd/BlockRDD.scala core/src/main/scala/spark/rdd/NewHadoopRDD.scala core/src/main/scala/spark/scheduler/ShuffleMapTask.scala core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala core/src/main/scala/spark/storage/BlockManager.scala core/src/main/scala/spark/storage/BlockMessage.scala core/src/main/scala/spark/storage/BlockStore.scala core/src/main/scala/spark/storage/StorageLevel.scala core/src/main/scala/spark/util/AkkaUtils.scala project/SparkBuild.scala run
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/QueueInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordMax2.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala4
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
16 files changed, 29 insertions, 23 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index ebff9bdb51..83a43d15cb 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -10,7 +10,7 @@ import java.io.{InputStream, ObjectStreamClass, ObjectInputStream, ObjectOutputS
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Serializable {
val master = ssc.sc.master
- val framework = ssc.sc.frameworkName
+ val framework = ssc.sc.jobName
val sparkHome = ssc.sc.sparkHome
val jars = ssc.sc.jars
val graph = ssc.graph
diff --git a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala
index 5522e2ee21..61d088eddb 100644
--- a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala
@@ -1,6 +1,7 @@
package spark.streaming
-import spark.{CoGroupedRDD, RDD, Partitioner}
+import spark.{RDD, Partitioner}
+import spark.rdd.CoGroupedRDD
class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]],
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 4bc063719c..12d7ba97ea 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._
import spark._
import spark.SparkContext._
+import spark.rdd._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
@@ -82,7 +83,7 @@ extends Serializable with Logging {
// Set caching level for the RDDs created by this DStream
def persist(newLevel: StorageLevel): DStream[T] = persist(newLevel, StorageLevel.NONE, null)
- def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_DESER)
+ def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY)
// Turn on the default caching level for this RDD
def cache(): DStream[T] = persist()
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
index 78537b8794..537ec88047 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
@@ -1,7 +1,7 @@
package spark.streaming
import spark.RDD
-import spark.UnionRDD
+import spark.rdd.UnionRDD
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
index 5669d7fedf..f3f4c3ab13 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
@@ -1,6 +1,9 @@
package spark.streaming
-import spark.{Logging, SparkEnv, RDD, BlockRDD}
+import scala.collection.mutable.ArrayBuffer
+
+import spark.{Logging, SparkEnv, RDD}
+import spark.rdd.BlockRDD
import spark.storage.StorageLevel
import java.nio.ByteBuffer
@@ -107,7 +110,8 @@ abstract class NetworkReceiver[T: ClassManifest](streamId: Int) extends Serializ
* This method pushes a block (as iterator of values) into the block manager.
*/
protected def pushBlock(blockId: String, iterator: Iterator[T], level: StorageLevel) {
- env.blockManager.put(blockId, iterator, level)
+ val buffer = new ArrayBuffer[T] ++ iterator
+ env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId)
}
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
index b794159b09..bb86e51932 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
@@ -1,7 +1,7 @@
package spark.streaming
import spark.RDD
-import spark.UnionRDD
+import spark.rdd.UnionRDD
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index fcf57aced7..1c57d5f855 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -3,8 +3,8 @@ package spark.streaming
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.UnionRDD
-import spark.CoGroupedRDD
+import spark.rdd.UnionRDD
+import spark.rdd.CoGroupedRDD
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index 3ba8fb45fb..086752ac55 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -1,9 +1,9 @@
package spark.streaming
import spark.RDD
-import spark.BlockRDD
+import spark.rdd.BlockRDD
import spark.Partitioner
-import spark.MapPartitionsRDD
+import spark.rdd.MapPartitionsRDD
import spark.SparkContext._
import spark.storage.StorageLevel
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index e124b8cfa0..7c7b3afe47 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -89,7 +89,7 @@ class StreamingContext (
def networkTextStream(
hostname: String,
port: Int,
- storageLevel: StorageLevel = StorageLevel.DISK_AND_MEMORY_2
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String] = {
networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
@@ -108,7 +108,7 @@ class StreamingContext (
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_2
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index b90e22351b..ce89a3f99b 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -1,7 +1,7 @@
package spark.streaming
import spark.RDD
-import spark.UnionRDD
+import spark.rdd.UnionRDD
class WindowedDStream[T: ClassManifest](
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index 9d1b0b9eb4..57fd10f0a5 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -41,7 +41,7 @@ object TopKWordCountRaw {
val windowedCounts = union.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
- windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2,
Milliseconds(chkptMs))
//windowedCounts.print() // TODO: something else?
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index 8390f4af94..0d2e62b955 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -100,8 +100,8 @@ object WordCount2 {
val windowedCounts = sentences
.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt)
- windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER,
- StorageLevel.MEMORY_ONLY_DESER_2,
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY,
+ StorageLevel.MEMORY_ONLY_2,
//new StorageLevel(false, true, true, 3),
Milliseconds(chkptMillis.toLong))
windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index d8a0664d7d..abfd12890f 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -41,7 +41,7 @@ object WordCountRaw {
val windowedCounts = union.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
- windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2,
Milliseconds(chkptMs))
//windowedCounts.print() // TODO: something else?
windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
index fc7567322b..9d44da2b11 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
@@ -57,10 +57,10 @@ object WordMax2 {
val windowedCounts = sentences
.mapPartitions(splitAndCountPartitions)
.reduceByKey(add _, reduceTasks.toInt)
- .persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ .persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2,
Milliseconds(chkptMillis.toLong))
.reduceByKeyAndWindow(max _, Seconds(10), batchDuration, reduceTasks.toInt)
- //.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2,
// Milliseconds(chkptMillis.toLong))
windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala
index d00ae9cbca..80ad924dd8 100644
--- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala
+++ b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala
@@ -171,11 +171,11 @@ extends Thread with Logging {
logInfo("Pushing block")
val startTime = System.currentTimeMillis
- val bytes = blockManager.dataSerialize(block.data.toIterator)
+ val bytes = blockManager.dataSerialize("rdd_", block.data.toIterator) // TODO: Will this be an RDD block?
val finishTime = System.currentTimeMillis
logInfo(block + " serialization delay is " + (finishTime - startTime) / 1000.0 + " s")
- blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_2)
+ blockManager.putBytes(block.id.toString, bytes, StorageLevel.MEMORY_AND_DISK_SER_2)
/*blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_DESER_2)*/
/*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY_DESER)*/
/*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY)*/
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index a3f213ebd0..f81ab2607f 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -18,7 +18,7 @@ class InputStreamsSuite extends TestSuiteBase {
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
- val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.DISK_AND_MEMORY)
+ val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
ssc.registerOutputStream(outputStream)