aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-07-12 14:49:16 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-07-12 14:49:16 +0530
commite86d5dbaad9de3c04fe080b8fb96a7ebbd20c7cd (patch)
tree2dcdbfa2daf5836357def8744447043ed2181c4e /streaming
parent6e6d94ffdfa885432408d6996bad7df2a641748a (diff)
parentbc19477efbe9ad18a27516a771dd14c497368516 (diff)
downloadspark-e86d5dbaad9de3c04fe080b8fb96a7ebbd20c7cd.tar.gz
spark-e86d5dbaad9de3c04fe080b8fb96a7ebbd20c7cd.tar.bz2
spark-e86d5dbaad9de3c04fe080b8fb96a7ebbd20c7cd.zip
Merge branch 'master' into master-merge
Conflicts: README.md core/pom.xml core/src/main/scala/spark/deploy/JsonProtocol.scala core/src/main/scala/spark/deploy/LocalSparkCluster.scala core/src/main/scala/spark/deploy/master/Master.scala core/src/main/scala/spark/deploy/master/MasterWebUI.scala core/src/main/scala/spark/deploy/worker/Worker.scala core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala core/src/main/scala/spark/storage/BlockManagerUI.scala core/src/main/scala/spark/util/AkkaUtils.scala pom.xml project/SparkBuild.scala streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/pom.xml4
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/Duration.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala18
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala7
6 files changed, 29 insertions, 18 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 1589e3202f..7b621c1239 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -59,6 +59,10 @@
<version>3.0.3</version>
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-zeromq_${scala.version}</artifactId>
<version>${akka.version}</version>
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 66e67cbfa1..450e48d66e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,7 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
-
+import java.util.concurrent.RejectedExecutionException
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
@@ -91,7 +91,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
oos.writeObject(checkpoint)
oos.close()
bos.close()
- executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ try {
+ executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ } catch {
+ case rej: RejectedExecutionException =>
+ logError("Could not submit checkpoint task to the thread pool executor", rej)
+ }
}
def stop() {
diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala
index ee26206e24..c2135195d8 100644
--- a/streaming/src/main/scala/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/spark/streaming/Duration.scala
@@ -1,5 +1,7 @@
package spark.streaming
+import spark.Utils
+
case class Duration (private val millis: Long) {
def < (that: Duration): Boolean = (this.millis < that.millis)
@@ -32,8 +34,10 @@ case class Duration (private val millis: Long) {
def toFormattedString: String = millis.toString
def milliseconds: Long = millis
-}
+ def prettyPrint = Utils.msDurationToString(millis)
+
+}
/**
* Helper object that creates instance of [[spark.streaming.Duration]] representing
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index aa1a8b6ba2..b0ebdea01c 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -15,6 +15,7 @@ import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
@@ -471,7 +472,7 @@ extends Serializable {
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
- ) {
+ ) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 3b9cd0e89b..71d2b9715c 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -143,12 +143,10 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/**
- * Pushes a block (as iterator of values) into the block manager.
+ * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
*/
- def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) {
- val buffer = new ArrayBuffer[T] ++ iterator
- env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level)
-
+ def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
+ env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId, metadata)
}
@@ -198,7 +196,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {
- case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
+ case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
@@ -225,17 +223,13 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
currentBuffer += obj
}
- private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
- new Block(blockId, iterator)
- }
-
private def updateCurrentBuffer(time: Long) {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[T]
if (newBlockBuffer.size > 0) {
val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval)
- val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
+ val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.add(newBlock)
}
} catch {
@@ -251,7 +245,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
try {
while(true) {
val block = blocksForPushing.take()
- NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
+ NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
}
} catch {
case ie: InterruptedException =>
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
index 42e885af5c..279e39d6d6 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -13,6 +13,8 @@ import spark.streaming.dstream.NetworkReceiver
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.mutable.ArrayBuffer
+
/** A helper with set of defaults for supervisor strategy */
object ReceiverSupervisorStrategy {
@@ -137,8 +139,9 @@ private[streaming] class ActorReceiver[T: ClassTag](
}
protected def pushBlock(iter: Iterator[T]) {
- pushBlock("block-" + streamId + "-" + System.nanoTime(),
- iter, null, storageLevel)
+ val buffer = new ArrayBuffer[T]
+ buffer ++= iter
+ pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel)
}
protected def onStart() = {