aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-24 14:46:19 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-24 14:46:19 -0800
commit32f1a2d540796dc8e0c9ba3a32cb1cf098362288 (patch)
treea8b14c0c32a294595f31bc679cc45337fc0b7e02
parent24c0cd616827b7d7ecdd2c7d770aedd020c006a1 (diff)
parent28f8b721f65fc8e699f208c5dc64d90822a85d91 (diff)
downloadspark-32f1a2d540796dc8e0c9ba3a32cb1cf098362288.tar.gz
spark-32f1a2d540796dc8e0c9ba3a32cb1cf098362288.tar.bz2
spark-32f1a2d540796dc8e0c9ba3a32cb1cf098362288.zip
Merge pull request #493 from tdas/streaming
Streaming bug fixes
-rw-r--r--.gitignore2
-rw-r--r--core/src/main/scala/spark/RDD.scala21
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala23
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala27
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala20
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala13
-rw-r--r--core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java20
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala13
-rw-r--r--core/src/main/scala/spark/network/Connection.scala24
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala17
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala15
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala108
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala26
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala4
-rw-r--r--core/src/main/scala/spark/util/Vector.scala10
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala21
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala26
-rw-r--r--docs/configuration.md6
-rw-r--r--docs/streaming-programming-guide.md87
-rw-r--r--examples/pom.xml2
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala2
-rw-r--r--project/SparkBuild.scala2
-rwxr-xr-xrun20
-rw-r--r--streaming/pom.xml18
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala16
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala201
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala1
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java37
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
36 files changed, 696 insertions, 105 deletions
diff --git a/.gitignore b/.gitignore
index 88d7b56181..155e785b01 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,3 +34,5 @@ log/
spark-tests.log
streaming-tests.log
dependency-reduced-pom.xml
+.ensime
+.ensime_lucene
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index da82dfd10f..9e8eaee756 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -30,6 +30,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
+import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
import spark.storage.StorageLevel
@@ -394,6 +395,26 @@ abstract class RDD[T: ClassManifest](
}
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: RDD[T]): RDD[T] =
+ subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
+ subtract(other, new HashPartitioner(numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: RDD[T], p: Partitioner): RDD[T] = new SubtractedRDD[T](this, other, p)
+
+ /**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d39767c3b3..f40bb7935f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -439,7 +439,7 @@ class SparkContext(
}
/**
- * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
+ * Broadcast a read-only variable to the cluster, returning a [[spark.broadcast.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index da3cb2cd31..ba00b6a844 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -6,8 +6,8 @@ import spark.api.java.function.{Function => JFunction}
import spark.util.StatCounter
import spark.partial.{BoundedDouble, PartialResult}
import spark.storage.StorageLevel
-
import java.lang.Double
+import spark.Partitioner
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
@@ -58,6 +58,27 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaDoubleRDD): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaDoubleRDD, numPartitions: Int): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaDoubleRDD, p: Partitioner): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other, p))
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index df3af3817d..cfbdda88c0 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -59,7 +59,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
- def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
+ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
/**
@@ -102,7 +102,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
- def combineByKey[C](createCombiner: Function[V, C],
+ def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
@@ -182,6 +182,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other, p))
+
+ /**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
* is true, Spark will group values of the same key together on the map side before the
* repartitioning, to only send each key over the network once. If a large number of
@@ -309,7 +330,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
- def mapValues[U](f: Function[V, U]): JavaPairRDD[K, U] = {
+ def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
fromRDD(rdd.mapValues(f))
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 3ccd6f055e..3016888898 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -55,6 +55,26 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd))
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaRDD[T], numPartitions: Int): JavaRDD[T] =
+ wrapRDD(rdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
+ wrapRDD(rdd.subtract(other, p))
+
}
object JavaRDD {
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 90b45cf875..d884529d7a 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
import com.google.common.base.Optional
-trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] {
+trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
implicit val classManifest: ClassManifest[T]
@@ -82,12 +82,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
}
/**
- * Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java.
+ * Return a new RDD by first applying a function to all elements of this
+ * RDD, and then flattening the results.
*/
- private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
+ def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -110,8 +111,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]):
- JavaPairRDD[K, V] = {
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+ JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
}
diff --git a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java b/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
deleted file mode 100644
index 68b6fd6622..0000000000
--- a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package spark.api.java;
-
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaRDDLike;
-import spark.api.java.function.PairFlatMapFunction;
-
-import java.io.Serializable;
-
-/**
- * Workaround for SPARK-668.
- */
-class PairFlatMapWorkaround<T> implements Serializable {
- /**
- * Return a new RDD by first applying a function to all elements of this
- * RDD, and then flattening the results.
- */
- public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) {
- return ((JavaRDDLike <T, ?>) this).doFlatMap(f);
- }
-}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 1cd68a2aa6..b7f167425f 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -33,6 +33,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
+ var firstApp: Option[ApplicationInfo] = None
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
@@ -167,7 +169,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+ .filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -190,7 +192,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
- for (worker <- workers if worker.coresFree > 0) {
+ for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
@@ -245,6 +247,13 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
idToApp(app.id) = app
actorToApp(driver) = app
addressToApp(driver.path.address) = app
+ if (firstApp == None) {
+ firstApp = Some(app)
+ }
+ val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
+ if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
+ logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
+ }
return app
}
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index cd5b7d57f3..d1451bc212 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -198,7 +198,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
outbox.synchronized {
outbox.addMessage(message)
if (channel.isConnected) {
- changeConnectionKeyInterest(SelectionKey.OP_WRITE)
+ changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ)
}
}
}
@@ -219,7 +219,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
def finishConnect() {
try {
channel.finishConnect
- changeConnectionKeyInterest(SelectionKey.OP_WRITE)
+ changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ)
logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
} catch {
case e: Exception => {
@@ -239,8 +239,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
currentBuffers ++= chunk.buffers
}
case None => {
- changeConnectionKeyInterest(0)
- /*key.interestOps(0)*/
+ changeConnectionKeyInterest(SelectionKey.OP_READ)
return
}
}
@@ -267,6 +266,23 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
}
}
}
+
+ override def read() {
+ // We don't expect the other side to send anything; so, we just read to detect an error or EOF.
+ try {
+ val length = channel.read(ByteBuffer.allocate(1))
+ if (length == -1) { // EOF
+ close()
+ } else if (length > 0) {
+ logWarning("Unexpected data read from SendingConnection to " + remoteConnectionManagerId)
+ }
+ } catch {
+ case e: Exception =>
+ logError("Exception while reading SendingConnection to " + remoteConnectionManagerId, e)
+ callOnExceptionCallback(e)
+ close()
+ }
+ }
}
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 8139a2a40c..78097502bc 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,7 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
/**
@@ -42,7 +42,7 @@ class HadoopRDD[K, V](
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
- extends RDD[(K, V)](sc, Nil) {
+ extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -71,7 +71,7 @@ class HadoopRDD[K, V](
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback(() => reader.close())
+ context.addOnCompleteCallback{ () => close() }
val key: K = reader.createKey()
val value: V = reader.createValue()
@@ -88,9 +88,6 @@ class HadoopRDD[K, V](
}
gotNext = true
}
- if (finished) {
- reader.close()
- }
!finished
}
@@ -104,6 +101,14 @@ class HadoopRDD[K, V](
gotNext = false
(key, value)
}
+
+ private def close() {
+ try {
+ reader.close()
+ } catch {
+ case e: Exception => logWarning("Exception in RecordReader.close()", e)
+ }
+ }
}
override def getPreferredLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index ebd4c3f0e2..df2361025c 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
private[spark]
@@ -26,7 +26,8 @@ class NewHadoopRDD[K, V](
valueClass: Class[V],
@transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
- with HadoopMapReduceUtil {
+ with HadoopMapReduceUtil
+ with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -61,7 +62,7 @@ class NewHadoopRDD[K, V](
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback(() => reader.close())
+ context.addOnCompleteCallback(() => close())
var havePair = false
var finished = false
@@ -81,6 +82,14 @@ class NewHadoopRDD[K, V](
havePair = false
return (reader.getCurrentKey, reader.getCurrentValue)
}
+
+ private def close() {
+ try {
+ reader.close()
+ } catch {
+ case e: Exception => logWarning("Exception in RecordReader.close()", e)
+ }
+ }
}
override def getPreferredLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
new file mode 100644
index 0000000000..daf9cc993c
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -0,0 +1,108 @@
+package spark.rdd
+
+import java.util.{HashSet => JHashSet}
+import scala.collection.JavaConversions._
+import spark.RDD
+import spark.Partitioner
+import spark.Dependency
+import spark.TaskContext
+import spark.Partition
+import spark.SparkEnv
+import spark.ShuffleDependency
+import spark.OneToOneDependency
+
+/**
+ * An optimized version of cogroup for set difference/subtraction.
+ *
+ * It is possible to implement this operation with just `cogroup`, but
+ * that is less efficient because all of the entries from `rdd2`, for
+ * both matching and non-matching values in `rdd1`, are kept in the
+ * JHashMap until the end.
+ *
+ * With this implementation, only the entries from `rdd1` are kept in-memory,
+ * and the entries from `rdd2` are essentially streamed, as we only need to
+ * touch each once to decide if the value needs to be removed.
+ *
+ * This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
+ * you can use `rdd1`'s partitioner/partition size and not worry about running
+ * out of memory because of the size of `rdd2`.
+ */
+private[spark] class SubtractedRDD[T: ClassManifest](
+ @transient var rdd1: RDD[T],
+ @transient var rdd2: RDD[T],
+ part: Partitioner) extends RDD[T](rdd1.context, Nil) {
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(rdd1, rdd2).map { rdd =>
+ if (rdd.partitioner == Some(part)) {
+ logInfo("Adding one-to-one dependency with " + rdd)
+ new OneToOneDependency(rdd)
+ } else {
+ logInfo("Adding shuffle dependency with " + rdd)
+ val mapSideCombinedRDD = rdd.mapPartitions(i => {
+ val set = new JHashSet[T]()
+ while (i.hasNext) {
+ set.add(i.next)
+ }
+ set.iterator
+ }, true)
+ // ShuffleDependency requires a tuple (k, v), which it will partition by k.
+ // We need this to partition to map to the same place as the k for
+ // OneToOneDependency, which means:
+ // - for already-tupled RDD[(A, B)], into getPartition(a)
+ // - for non-tupled RDD[C], into getPartition(c)
+ val part2 = new Partitioner() {
+ def numPartitions = part.numPartitions
+ def getPartition(key: Any) = key match {
+ case (k, v) => part.getPartition(k)
+ case k => part.getPartition(k)
+ }
+ }
+ new ShuffleDependency(mapSideCombinedRDD.map((_, null)), part2)
+ }
+ }
+ }
+
+ override def getPartitions: Array[Partition] = {
+ val array = new Array[Partition](part.numPartitions)
+ for (i <- 0 until array.size) {
+ // Each CoGroupPartition will depend on rdd1 and rdd2
+ array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
+ dependencies(j) match {
+ case s: ShuffleDependency[_, _] =>
+ new ShuffleCoGroupSplitDep(s.shuffleId)
+ case _ =>
+ new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
+ }
+ }.toList)
+ }
+ array
+ }
+
+ override val partitioner = Some(part)
+
+ override def compute(p: Partition, context: TaskContext): Iterator[T] = {
+ val partition = p.asInstanceOf[CoGroupPartition]
+ val set = new JHashSet[T]
+ def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match {
+ case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
+ for (k <- rdd.iterator(itsSplit, context))
+ op(k.asInstanceOf[T])
+ case ShuffleCoGroupSplitDep(shuffleId) =>
+ for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index))
+ op(k.asInstanceOf[T])
+ }
+ // the first dep is rdd1; add all keys to the set
+ integrate(partition.deps(0), set.add)
+ // the second dep is rdd2; remove all of its keys from the set
+ integrate(partition.deps(1), set.remove)
+ set.iterator
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdd1 = null
+ rdd2 = null
+ }
+
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 1e4fbdb874..d9c2f9517b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -11,6 +11,7 @@ import spark.TaskState.TaskState
import spark.scheduler._
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@@ -22,6 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+ // Threshold above which we warn user initial TaskSet may be starved
+ val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -30,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
+ var hasReceivedTask = false
+ var hasLaunchedTask = false
+ val starvationTimer = new Timer(true)
+
// Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0)
@@ -94,6 +101,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets(taskSet.id) = manager
activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+
+ if (hasReceivedTask == false) {
+ starvationTimer.scheduleAtFixedRate(new TimerTask() {
+ override def run() {
+ if (!hasLaunchedTask) {
+ logWarning("Initial job has not accepted any resources; " +
+ "check your cluster UI to ensure that workers are registered")
+ } else {
+ this.cancel()
+ }
+ }
+ }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+ }
+ hasReceivedTask = true;
}
backend.reviveOffers()
}
@@ -150,6 +171,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
} while (launchedTask)
}
+ if (tasks.size > 0) {
+ hasLaunchedTask = true
+ }
return tasks
}
}
@@ -235,7 +259,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
override def defaultParallelism() = backend.defaultParallelism()
-
+
// Check for speculatable tasks in all our active jobs.
def checkSpeculatableTasks() {
var shouldRevive = false
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index a342d378ff..dafa906712 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -38,7 +38,7 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
object MetadataCleaner {
- def getDelaySeconds = System.getProperty("spark.cleaner.delay", "-1").toInt
- def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.delay", delay.toString) }
+ def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
+ def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) }
}
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
index 03559751bc..835822edb2 100644
--- a/core/src/main/scala/spark/util/Vector.scala
+++ b/core/src/main/scala/spark/util/Vector.scala
@@ -11,12 +11,16 @@ class Vector(val elements: Array[Double]) extends Serializable {
return Vector(length, i => this(i) + other(i))
}
+ def add(other: Vector) = this + other
+
def - (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
return Vector(length, i => this(i) - other(i))
}
+ def subtract(other: Vector) = this - other
+
def dot(other: Vector): Double = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
@@ -61,10 +65,16 @@ class Vector(val elements: Array[Double]) extends Serializable {
this
}
+ def addInPlace(other: Vector) = this +=other
+
def * (scale: Double): Vector = Vector(length, i => this(i) * scale)
+ def multiply (d: Double) = this * d
+
def / (d: Double): Vector = this * (1 / d)
+ def divide (d: Double) = this / d
+
def unary_- = this * -1
def sum = elements.reduceLeft(_ + _)
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 0e2585daa4..caa4ba3a37 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(grouped.collect.size === 1)
}
}
+
+ test("recover from node failures with replication") {
+ import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+ DistributedSuite.amMaster = true
+ // Using more than two nodes so we don't have a symmetric communication pattern and might
+ // cache a partially correct list of peers.
+ sc = new SparkContext("local-cluster[3,1,512]", "test")
+ for (i <- 1 to 3) {
+ val data = sc.parallelize(Seq(true, false, false, false), 4)
+ data.persist(StorageLevel.MEMORY_ONLY_2)
+
+ assert(data.count === 4)
+ assert(data.map(markNodeIfIdentity).collect.size === 4)
+ assert(data.map(failOnMarkedIdentity).collect.size === 4)
+
+ // Create a new replicated RDD to make sure that cached peer information doesn't cause
+ // problems.
+ val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
+ assert(data2.count === 2)
+ }
+ }
}
object DistributedSuite {
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 92c3f67416..77e0eab829 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -234,6 +234,32 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(rdd.keys.collect().toList === List(1, 2))
assert(rdd.values.collect().toList === List("a", "b"))
}
+
+ test("subtract") {
+ sc = new SparkContext("local", "test")
+ val a = sc.parallelize(Array(1, 2, 3), 2)
+ val b = sc.parallelize(Array(2, 3, 4), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set(1))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtract with narrow dependency") {
+ sc = new SparkContext("local", "test")
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+ println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList)
+ // more splits/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+ assert(c.partitioner.get === p)
+ }
}
object ShuffleSuite {
diff --git a/docs/configuration.md b/docs/configuration.md
index f1ca77aa78..04eb6daaa5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -183,7 +183,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.broadcast.factory</td>
- <td>spark.broadcast. HttpBroadcastFactory</td>
+ <td>spark.broadcast.HttpBroadcastFactory</td>
<td>
Which broadcast implementation to use.
</td>
@@ -244,10 +244,10 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td>spark.cleaner.delay</td>
+ <td>spark.cleaner.ttl</td>
<td>(disable)</td>
<td>
- Duration (minutes) of how long Spark will remember any metadata (stages generated, tasks generated, etc.).
+ Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.).
Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is
useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming
applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 71e1bd4aab..ded43e67cd 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -207,7 +207,7 @@ ssc.stop()
{% endhighlight %}
# Example
-A simple example to start off is the [NetworkWordCount](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala). This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in `<Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala` .
+A simple example to start off is the [NetworkWordCount](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala). This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in `<Spark repo>/streaming/src/main/scala/spark/streaming/examples/NetworkWordCount.scala` .
{% highlight scala %}
import spark.streaming.{Seconds, StreamingContext}
@@ -216,7 +216,7 @@ import spark.streaming.StreamingContext._
// Create the context and set up a network input stream to receive from a host:port
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
-val lines = ssc.networkTextStream(args(1), args(2).toInt)
+val lines = ssc.socketTextStream(args(1), args(2).toInt)
// Split the lines into words, count them, and print some of the counts on the master
val words = lines.flatMap(_.split(" "))
@@ -227,6 +227,8 @@ wordCounts.print()
ssc.start()
{% endhighlight %}
+The `socketTextStream` returns a DStream of lines received from a TCP socket-based source. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second.
+
To run this example on your local machine, you need to first run a Netcat server by using
{% highlight bash %}
@@ -335,7 +337,7 @@ For a Spark Streaming application running on a cluster to be stable, the process
A good approach to figure out the right batch size for your application is to test it with a conservative batch size (say, 5-10 seconds) and a low data rate. To verify whether the system is able to keep up with data rate, you can check the value of the end-to-end delay experienced by each processed batch (in the Spark master logs, find the line having the phrase "Total delay"). If the delay is maintained to be less than the batch size, then system is stable. Otherwise, if the delay is continuously increasing, it means that the system is unable to keep up and it therefore unstable. Once you have an idea of a stable configuration, you can try increasing the data rate and/or reducing the batch size. Note that momentary increase in the delay due to temporary data rate increases maybe fine as long as the delay reduces back to a low value (i.e., less than batch size).
## 24/7 Operation
-By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.). But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic cleanup of it metadata. This can be enabled by setting the Java system property `spark.cleaner.delay` to the number of seconds you want any metadata to persist. For example, setting `spark.cleaner.delay` to 600 would cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes. Note, that this property needs to be set before the SparkContext is created.
+By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.). But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic cleanup of it metadata. This can be enabled by setting the Java system property `spark.cleaner.ttl` to the number of seconds you want any metadata to persist. For example, setting `spark.cleaner.ttl` to 600 would cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes. Note, that this property needs to be set before the SparkContext is created.
This value is closely tied with any window operation that is being used. Any window operation would require the input data to be persisted in memory for at least the duration of the window. Hence it is necessary to set the delay to at least the value of the largest window operation used in the Spark Streaming application. If this delay is set too low, the application will throw an exception saying so.
@@ -347,15 +349,19 @@ Tuning the memory usage and GC behavior of Spark applications have been discusse
* **Concurrent garbage collector**: Using the concurrent mark-and-sweep GC further minimizes the variability of GC pauses. Even though concurrent GC is known to reduce the overall processing throughput of the system, its use is still recommended to achieve more consistent batch processing times.
# Fault-tolerance Properties
-There are two aspects to fault-tolerance - failure of a worker node and that of a driver node. In this section, we are going to discuss the fault-tolerance behavior and the semantics of the processed data.
+In this section, we are going to discuss the behavior of Spark Streaming application in the event of a node failure. To understand this, let us remember the basic fault-tolerance properties of Spark's RDDs.
+
+ 1. An RDD is an immutable, and deterministically re-computable, distributed dataset. Each RDD remembers the lineage of deterministic operations that were used on a fault-tolerant input dataset to create it.
+ 1. If any partition of an RDD is lost due to a worker node failure, then that partition can be re-computed from the original fault-tolerant dataset using the lineage of operations.
+
+Since all data transformations in Spark Streaming are based on RDD operations, as long as the input dataset is present, all intermediate data can recomputed. Keeping these properties in mind, we are going to discuss the failure semantics in more detail.
## Failure of a Worker Node
-In case of the worker node failure, none of the processed data will be lost because
-1. All the input data is fault-tolerant (either the data is on HDFS, or it replicated Spark Streaming if received from the network)
-1. All intermediate data is expressed as RDDs with their lineage to the input data, which allows Spark to recompute any part of the intermediate data is lost to worker node failure.
+There are two failure behaviors based on which input sources are used.
-If the worker node where a network data receiver is running fails, then the receiver will be restarted on a different node and it will continue to receive data. However, data that was accepted by the receiver but not yet replicated to other Spark nodes may be lost, which is a fraction of a second of data.
+1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure.
+1. _Using any input source that receives data through a network_ - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data.
Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations.
@@ -372,11 +378,19 @@ All this is periodically saved in the file `<checkpoint directory>/graph` where
val ssc = new StreamingContext(checkpointDirectory)
{% endhighlight %}
-Calling `ssc.start()` on this new context will restart the receivers and the stream computations.
+On calling `ssc.start()` on this new context, the following steps are taken by the system
+
+1. Schedule the transformations and output operations for all the time steps between the time when the driver failed and when it was restarted. This is also done for those time steps that were scheduled but not processed due to the failure. This will make the system recompute all the intermediate data from the checkpointed RDD files, etc.
+1. Restart the network receivers, if any, and continue receiving new data.
+
+In the current _alpha_ release, there are two different failure behaviors based on which input sources are used.
+
+1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure.
+1. _Using any input source that receives data through a network_ - As aforesaid, the received input data is replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost when the Spark driver fails, the past input data will not be accessible and driver recovers. Hence, if stateful and window-based operations are used (like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state will not be recovered completely.
-In case of stateful operations (that is, `updateStateByKey` and `reduceByKeyAndWindow` with inverse function), the intermediate data at the time of failure also needs to be recomputed.This requires two things - (i) the RDD checkpoints and (ii) the data received since the checkpoints. In the current _alpha_ release, the input data received from the network is not saved durably across driver failures (the data is only replicated in memory of the worker processes and gets lost when the driver fails). Only with file input streams (where the data is already durably stored) is the recovery from driver failure complete and all intermediate data is recomputed. In a future release, this will be true for all input streams. Note that for non-stateful operations, with _all_ input streams, the system will recover and continue receiving and processing new data.
+In future releases, this behaviour will be fixed for all input sources, that is, all data will be recovered irrespective of which input sources are used. Note that for non-stateful transformations like `map`, `count`, and `reduceByKey`, with _all_ input streams, the system, upon restarting, will continue to receive and process new data.
-To understand the behavior of the system under driver failure, lets consider what will happen with a file input stream Specifically, in the case of the file input stream, it will correctly identify new files that were created while the driver was down and process them in the same way as it would have if the driver had not failed. To explain further in the case of file input stream, we shall use an example. Lets say, files are being generated every second, and a Spark Streaming program reads every new file and output the number of lines in the file. This is what the sequence of outputs would be with and without a driver failure.
+To better understand the behavior of the system under driver failure with a HDFS source, lets consider what will happen with a file input stream Specifically, in the case of the file input stream, it will correctly identify new files that were created while the driver was down and process them in the same way as it would have if the driver had not failed. To explain further in the case of file input stream, we shall use an example. Lets say, files are being generated every second, and a Spark Streaming program reads every new file and output the number of lines in the file. This is what the sequence of outputs would be with and without a driver failure.
<table class="table">
<!-- Results table headers -->
@@ -450,6 +464,55 @@ To understand the behavior of the system under driver failure, lets consider wha
If the driver had crashed in the middle of the processing of time 3, then it will process time 3 and output 30 after recovery.
+# Java API
+
+Similar to [Spark's Java API](java-programming-guide.html), we also provide a Java API for Spark Streaming which allows all its features to be accessible from a Java program. This is defined in [spark.streaming.api.java] (api/streaming/index.html#spark.streaming.api.java.package) package and includes [JavaStreamingContext](api/streaming/index.html#spark.streaming.api.java.JavaStreamingContext) and [JavaDStream](api/streaming/index.html#spark.streaming.api.java.JavaDStream) classes that provide the same methods as their Scala counterparts, but take Java functions (that is, Function, and Function2) and return Java data and collection types. Some of the key points to note are:
+
+1. Functions for transformations must be implemented as subclasses of [Function](api/core/index.html#spark.api.java.function.Function) and [Function2](api/core/index.html#spark.api.java.function.Function2)
+1. Unlike the Scala API, the Java API handles DStreams for key-value pairs using a separate [JavaPairDStream](api/streaming/index.html#spark.streaming.api.java.JavaPairDStream) class(similar to [JavaRDD and JavaPairRDD](java-programming-guide.html#rdd-classes). DStream functions like `map` and `filter` are implemented separately by JavaDStreams and JavaPairDStream to return DStreams of appropriate types.
+
+Spark's [Java Programming Guide](java-programming-guide.html) gives more ideas about using the Java API. To extends the ideas presented for the RDDs to DStreams, we present parts of the Java version of the same NetworkWordCount example presented above. The full source code is given at `<spark repo>/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java`
+
+The streaming context and the socket stream from input source is started by using a `JavaStreamingContext`, that has the same parameters and provides the same input streams as its Scala counterpart.
+
+{% highlight java %}
+JavaStreamingContext ssc = new JavaStreamingContext(mesosUrl, "NetworkWordCount", Seconds(1));
+JavaDStream<String> lines = ssc.socketTextStream(ip, port);
+{% endhighlight %}
+
+
+Then the `lines` are split into words by using the `flatMap` function and [FlatMapFunction](api/core/index.html#spark.api.java.function.FlatMapFunction).
+
+{% highlight java %}
+JavaDStream<String> words = lines.flatMap(
+ new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(x.split(" "));
+ }
+ });
+{% endhighlight %}
+
+The `words` is then mapped to a [JavaPairDStream](api/streaming/index.html#spark.streaming.api.java.JavaPairDStream) of `(word, 1)` pairs using `map` and [PairFunction](api/core/index.html#spark.api.java.function.PairFunction). This is reduced by using `reduceByKey` and [Function2](api/core/index.html#spark.api.java.function.Function2).
+
+{% highlight java %}
+JavaPairDStream<String, Integer> wordCounts = words.map(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ });
+{% endhighlight %}
+
+
+
# Where to Go from Here
-* Documentation - [Scala and Java](api/streaming/index.html)
+* Documentation - [Scala](api/streaming/index.html#spark.streaming.package) and [Java](api/streaming/index.html#spark.streaming.api.java.package)
* More examples - [Scala](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples) and [Java](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/streaming/examples)
diff --git a/examples/pom.xml b/examples/pom.xml
index f6125444e2..7d975875fa 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_2.9.2</artifactId>
- <version>0.1.9</version>
+ <version>0.1.8</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
index 07342beb02..0e9eadd01b 100644
--- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
@@ -23,7 +23,7 @@ import spark.streaming.api.java.JavaStreamingContext;
*/
public class JavaNetworkWordCount {
public static void main(String[] args) {
- if (args.length < 2) {
+ if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1");
System.exit(1);
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
index 346151c147..76293fbb96 100644
--- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -131,8 +131,7 @@ object ActorWordCount {
val Seq(master, host, port) = args.toSeq
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "ActorWordCount",
- Seconds(10))
+ val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2))
/*
* Following is the use of actorStream to plug in custom actor as receiver
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 7ff70ae2e5..5ac6d19b34 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -16,7 +16,7 @@ import spark.streaming.StreamingContext._
*/
object NetworkWordCount {
def main(args: Array[String]) {
- if (args.length < 2) {
+ if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
index 2eec777c54..66e709b7a3 100644
--- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
@@ -37,7 +37,7 @@ object RawNetworkGrep {
RawTextHelper.warmUp(ssc.sc)
val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
+ ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = ssc.union(rawStreams)
union.filter(_.contains("the")).count().foreach(r =>
println("Grep count: " + r.collect().mkString))
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 5e7c3b5e3a..22bdc93602 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -155,7 +155,7 @@ object SparkBuild extends Build {
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
- libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.9")
+ libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.8")
)
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
diff --git a/run b/run
index 2d8a737e01..6b2d84d48d 100755
--- a/run
+++ b/run
@@ -25,6 +25,26 @@ if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker"
SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default
fi
+
+# Add java opts for master, worker, executor. The opts maybe null
+case "$1" in
+ 'spark.deploy.master.Master')
+ SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS"
+ ;;
+ 'spark.deploy.worker.Worker')
+ SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS"
+ ;;
+ 'spark.executor.StandaloneExecutorBackend')
+ SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS"
+ ;;
+ 'spark.executor.MesosExecutorBackend')
+ SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS"
+ ;;
+ 'spark.repl.Main')
+ SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS"
+ ;;
+esac
+
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
if [ `command -v scala` ]; then
RUNNER="scala"
diff --git a/streaming/pom.xml b/streaming/pom.xml
index d78c39da0d..92b17fc3af 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -20,6 +20,17 @@
<id>lib</id>
<url>file://${project.basedir}/lib</url>
</repository>
+ <repository>
+ <id>akka-repo</id>
+ <name>Akka Repository</name>
+ <url>http://repo.akka.io/releases</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
</repositories>
<dependencies>
@@ -53,11 +64,10 @@
<version>3.0.3</version>
</dependency>
<dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-core</artifactId>
- <version>3.0.3</version>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-zeromq</artifactId>
+ <version>2.0.3</version>
</dependency>
-
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 64972fd5cd..b159d26c02 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -141,7 +141,7 @@ class NetworkInputTracker(
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
- //ssc.sparkContext.makeRDD(1 to 100, 100).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+ ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
// Distribute the receivers and start them
ssc.sparkContext.runJob(tempRDD, startReceiver)
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index d0430b3f3e..25c67b279b 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -170,7 +170,8 @@ class StreamingContext private (
* should be same.
*/
def actorStream[T: ClassManifest](
- props: Props, name: String,
+ props: Props,
+ name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@@ -179,19 +180,20 @@ class StreamingContext private (
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
- * @param zeroMQ topic to subscribe to
+ * @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
- def zeroMQStream[T: ClassManifest](publisherUrl:String,
+ def zeroMQStream[T: ClassManifest](
+ publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
-
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
@@ -283,7 +285,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
- def rawNetworkStream[T: ClassManifest](
+ def rawSocketStream[T: ClassManifest](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
@@ -352,7 +354,7 @@ class StreamingContext private (
def twitterStream(
username: String,
password: String,
- filters: Seq[String],
+ filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 51efe6cae8..4d93f0a5f7 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -17,9 +17,7 @@ import spark.RDD
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[spark.streaming.api.java.JavaPairDStream]] contains operations available
- * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
- * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
- * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`.
*
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index d2a0ba725f..f3b40b5b88 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -1,16 +1,26 @@
package spark.streaming.api.java
-import scala.collection.JavaConversions._
-import java.lang.{Long => JLong, Integer => JInt}
-
import spark.streaming._
-import dstream._
+import receivers.{ActorReceiver, ReceiverSupervisorStrategy}
+import spark.streaming.dstream._
import spark.storage.StorageLevel
+
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import spark.api.java.{JavaSparkContext, JavaRDD}
+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+
+import twitter4j.Status
+
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
+
+import scala.collection.JavaConversions._
+
+import java.lang.{Long => JLong, Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap}
-import spark.api.java.{JavaSparkContext, JavaRDD}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -128,7 +138,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
+ def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
ssc.socketTextStream(hostname, port, storageLevel)
}
@@ -186,13 +196,13 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
- def rawNetworkStream[T](
+ def rawSocketStream[T](
hostname: String,
port: Int,
storageLevel: StorageLevel): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel))
+ JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
}
/**
@@ -204,10 +214,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param port Port to connect to for receiving data
* @tparam T Type of the objects in the received blocks
*/
- def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = {
+ def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port))
+ JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
}
/**
@@ -246,12 +256,179 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
- def flumeStream(hostname: String, port: Int):
- JavaDStream[SparkFlumeEvent] = {
+ def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port)
}
/**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ username: String,
+ password: String,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(username, password, filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(
+ username: String,
+ password: String,
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(username, password, filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ */
+ def twitterStream(
+ username: String,
+ password: String
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(username, password)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name, storageLevel)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ }
+
+ /**
* Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
index a4db44a608..3c5d43a609 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -29,7 +29,7 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex
false // Time not valid
} else {
// Time is valid, but check it it is more than lastValidTime
- if (lastValidTime == null || lastValidTime <= time) {
+ if (lastValidTime != null && time < lastValidTime) {
logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime)
}
lastValidTime = time
diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
index 024bf3bea4..6b310bc0b6 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
@@ -7,6 +7,7 @@ import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
import spark.streaming.{Time, StreamingContext}
+private[streaming]
class QueueInputDStream[T: ClassManifest](
@transient ssc: StreamingContext,
val queue: Queue[RDD[T]],
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 4530af5f6a..3bed500f73 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -24,10 +24,16 @@ import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
+import spark.streaming.InputStreamsSuite;
import java.io.*;
import java.util.*;
+import akka.actor.Props;
+import akka.zeromq.Subscribe;
+
+
+
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
@@ -1205,12 +1211,12 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testNetworkTextStream() {
+ public void testSocketTextStream() {
JavaDStream test = ssc.socketTextStream("localhost", 12345);
}
@Test
- public void testNetworkString() {
+ public void testSocketString() {
class Converter extends Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
@@ -1239,13 +1245,13 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testRawNetworkStream() {
- JavaDStream test = ssc.rawNetworkStream("localhost", 12345);
+ public void testRawSocketStream() {
+ JavaDStream test = ssc.rawSocketStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
- JavaDStream test = ssc.flumeStream("localhost", 12345);
+ JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
}
@Test
@@ -1253,4 +1259,25 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
}
+
+ @Test
+ public void testTwitterStream() {
+ String[] filters = new String[] { "good", "bad", "ugly" };
+ JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testActorStream() {
+ JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testZeroMQStream() {
+ JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
+ @Override
+ public Iterable<String> call(byte[][] b) throws Exception {
+ return null;
+ }
+ });
+ }
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index c9f941c5b8..1024d3ac97 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -42,7 +42,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
- test("network input stream") {
+ test("socket input stream") {
// Start the server
val testServer = new TestServer(testPort)
testServer.start()