aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhaitao.yao <yao.erix@gmail.com>2013-02-16 10:11:28 +0800
committerhaitao.yao <yao.erix@gmail.com>2013-02-16 10:11:28 +0800
commita9cfac347a367ba252e2061f5d9910355e2fe0c3 (patch)
treeb4abb2c66b67f1a117aabdb850285587c8d4348e
parentf609182e5bfc73110181f8c432cea460a74e61d6 (diff)
parent3bcc6e5c0395b7478bc19572cbef3958f13daf6e (diff)
downloadspark-a9cfac347a367ba252e2061f5d9910355e2fe0c3.tar.gz
spark-a9cfac347a367ba252e2061f5d9910355e2fe0c3.tar.bz2
spark-a9cfac347a367ba252e2061f5d9910355e2fe0c3.zip
Merge branch 'mesos'
-rw-r--r--bagel/src/main/scala/spark/bagel/Bagel.scala22
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala35
-rw-r--r--core/src/main/scala/spark/RDD.scala23
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala5
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala7
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala5
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala2
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala12
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala31
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala16
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala25
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala35
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala14
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala3
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala41
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/FilteredRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/FlatMappedRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/GlommedRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/MappedRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala8
-rw-r--r--core/src/main/scala/spark/rdd/PartitionPruningRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/PipedRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/SampledRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/UnionRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala40
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/twirl/spark/deploy/master/index.scala.html6
-rw-r--r--core/src/main/twirl/spark/deploy/worker/index.scala.html6
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala4
-rw-r--r--core/src/test/scala/spark/DriverSuite.scala3
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java24
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala8
-rw-r--r--docs/configuration.md10
-rw-r--r--docs/contributing-to-spark.md2
-rw-r--r--docs/scala-programming-guide.md2
-rw-r--r--docs/spark-standalone.md8
-rw-r--r--docs/tuning.md2
-rw-r--r--examples/src/main/scala/spark/examples/LogQuery.scala66
-rwxr-xr-xpyspark7
-rw-r--r--python/pyspark/rdd.py4
-rwxr-xr-xrun12
-rw-r--r--run2.cmd23
-rw-r--r--sbt/sbt.cmd2
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala34
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala8
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java186
60 files changed, 629 insertions, 207 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index 996ca2a877..fa0ba4a573 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -6,8 +6,8 @@ import spark.SparkContext._
import scala.collection.mutable.ArrayBuffer
object Bagel extends Logging {
- def run[K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
- C : Manifest, A : Manifest](
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
+ C: Manifest, A: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
@@ -50,8 +50,7 @@ object Bagel extends Logging {
verts
}
- def run[K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
- C : Manifest](
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
@@ -66,8 +65,7 @@ object Bagel extends Logging {
addAggregatorArg[K, V, M, C](compute))
}
- def run[K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
- C : Manifest](
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
@@ -82,7 +80,7 @@ object Bagel extends Logging {
addAggregatorArg[K, V, M, C](compute))
}
- def run[K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
@@ -100,7 +98,7 @@ object Bagel extends Logging {
* Aggregates the given vertices using the given aggregator, if it
* is specified.
*/
- private def agg[K, V <: Vertex, A : Manifest](
+ private def agg[K, V <: Vertex, A: Manifest](
verts: RDD[(K, V)],
aggregator: Option[Aggregator[V, A]]
): Option[A] = aggregator match {
@@ -116,7 +114,7 @@ object Bagel extends Logging {
* function. Returns the processed RDD, the number of messages
* created, and the number of active vertices.
*/
- private def comp[K : Manifest, V <: Vertex, M <: Message[K], C](
+ private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
compute: (V, Option[C]) => (V, Array[M])
@@ -149,9 +147,7 @@ object Bagel extends Logging {
* Converts a compute function that doesn't take an aggregator to
* one that does, so it can be passed to Bagel.run.
*/
- private def addAggregatorArg[
- K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C
- ](
+ private def addAggregatorArg[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C](
compute: (V, Option[C], Int) => (V, Array[M])
): (V, Option[C], Option[Nothing], Int) => (V, Array[M]) = {
(vert: V, msgs: Option[C], aggregated: Option[Nothing], superstep: Int) =>
@@ -170,7 +166,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}
-class DefaultCombiner[M : Manifest] extends Combiner[M, Array[M]] with Serializable {
+class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
def createCombiner(msg: M): Array[M] =
Array(msg)
def mergeMsg(combiner: Array[M], msg: M): Array[M] =
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 3c2f9c4616..47829a431e 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -1,10 +1,8 @@
package spark.bagel
import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
import scala.collection.mutable.ArrayBuffer
@@ -13,7 +11,7 @@ import spark._
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable
-class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
+class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {
var sc: SparkContext = _
@@ -25,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}
-
+
test("halting by voting") {
sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0))))
@@ -36,8 +34,9 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
(new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
}
- for ((id, vert) <- result.collect)
+ for ((id, vert) <- result.collect) {
assert(vert.age === numSupersteps)
+ }
}
test("halting by message silence") {
@@ -57,7 +56,27 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
}
(new TestVertex(self.active, self.age + 1), msgsOut)
}
- for ((id, vert) <- result.collect)
+ for ((id, vert) <- result.collect) {
assert(vert.age === numSupersteps)
+ }
+ }
+
+ test("large number of iterations") {
+ // This tests whether jobs with a large number of iterations finish in a reasonable time,
+ // because non-memoized recursion in RDD or DAGScheduler used to cause them to hang
+ failAfter(10 seconds) {
+ sc = new SparkContext("local", "test")
+ val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
+ val msgs = sc.parallelize(Array[(String, TestMessage)]())
+ val numSupersteps = 50
+ val result =
+ Bagel.run(sc, verts, msgs, sc.defaultParallelism) {
+ (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
+ (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
+ }
+ for ((id, vert) <- result.collect) {
+ assert(vert.age === numSupersteps)
+ }
+ }
}
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 9d6ea782bd..f6e927a989 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -20,6 +20,7 @@ import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
import spark.partial.PartialResult
+import spark.rdd.CoalescedRDD
import spark.rdd.CartesianRDD
import spark.rdd.FilteredRDD
import spark.rdd.FlatMappedRDD
@@ -232,6 +233,11 @@ abstract class RDD[T: ClassManifest](
def distinct(): RDD[T] = distinct(splits.size)
/**
+ * Return a new RDD that is reduced into `numSplits` partitions.
+ */
+ def coalesce(numSplits: Int): RDD[T] = new CoalescedRDD(this, numSplits)
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
@@ -378,7 +384,7 @@ abstract class RDD[T: ClassManifest](
}
/**
- * Reduces the elements of this RDD using the specified associative binary operator.
+ * Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
@@ -630,16 +636,22 @@ abstract class RDD[T: ClassManifest](
/** The [[spark.SparkContext]] that this RDD was created on. */
def context = sc
+ // Avoid handling doCheckpoint multiple times to prevent excessive recursion
+ private var doCheckpointCalled = false
+
/**
* Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
* after a job using this RDD has completed (therefore the RDD has been materialized and
* potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint() {
- if (checkpointData.isDefined) {
- checkpointData.get.doCheckpoint()
- } else {
- dependencies.foreach(_.rdd.doCheckpoint())
+ if (!doCheckpointCalled) {
+ doCheckpointCalled = true
+ if (checkpointData.isDefined) {
+ checkpointData.get.doCheckpoint()
+ } else {
+ dependencies.foreach(_.rdd.doCheckpoint())
+ }
}
}
@@ -649,7 +661,6 @@ abstract class RDD[T: ClassManifest](
*/
private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
clearDependencies()
- dependencies_ = null
splits_ = null
deps = null // Forget the constructor argument for dependencies too
}
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index 843e1bd18b..2810631b41 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -53,6 +53,11 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
fromRDD(srdd.filter(x => f(x).booleanValue()))
/**
+ * Return a new RDD that is reduced into `numSplits` partitions.
+ */
+ def coalesce(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numSplits))
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 8ce32e0e2f..55dc755358 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -63,6 +63,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
/**
+ * Return a new RDD that is reduced into `numSplits` partitions.
+ */
+ def coalesce(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numSplits))
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] =
@@ -447,7 +452,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/
def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
- sortByKey(comp, true)
+ sortByKey(comp, ascending)
}
/**
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index ac31350ec3..23e7ae2726 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -39,6 +39,11 @@ JavaRDDLike[T, JavaRDD[T]] {
wrapRDD(rdd.filter((x => f(x).booleanValue())))
/**
+ * Return a new RDD that is reduced into `numSplits` partitions.
+ */
+ def coalesce(numSplits: Int): JavaRDD[T] = rdd.coalesce(numSplits)
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 60025b459c..d34d56d169 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -201,7 +201,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
}
/**
- * Reduces the elements of this RDD using the specified associative binary operator.
+ * Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 35f40c6e91..1d88d4bc84 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -30,6 +30,8 @@ case class ExecutorStateChanged(
exitStatus: Option[Int])
extends DeployMessage
+private[spark] case class Heartbeat(workerId: String) extends DeployMessage
+
// Master to Worker
private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
@@ -45,7 +47,6 @@ private[spark] case class LaunchExecutor(
sparkHome: String)
extends DeployMessage
-
// Client to Master
private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
@@ -76,8 +77,11 @@ private[spark] case object RequestMasterState
// Master to MasterWebUI
private[spark]
-case class MasterState(uri: String, workers: Array[WorkerInfo], activeJobs: Array[JobInfo],
- completedJobs: Array[JobInfo])
+case class MasterState(host: String, port: Int, workers: Array[WorkerInfo],
+ activeJobs: Array[JobInfo], completedJobs: Array[JobInfo]) {
+
+ def uri = "spark://" + host + ":" + port
+}
// WorkerWebUI to Worker
private[spark] case object RequestWorkerState
@@ -85,6 +89,6 @@ private[spark] case object RequestWorkerState
// Worker to WorkerWebUI
private[spark]
-case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
+case class WorkerState(host: String, port: Int, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index a63eee1233..e01181d1b2 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -107,7 +107,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = 1.seconds
+ val timeout = 5.seconds
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 92e7914b1b..a5de23261c 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -3,6 +3,7 @@ package spark.deploy.master
import akka.actor._
import akka.actor.Terminated
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.util.duration._
import java.text.SimpleDateFormat
import java.util.Date
@@ -16,6 +17,7 @@ import spark.util.AkkaUtils
private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
+ val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
var nextJobNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -46,6 +48,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi()
+ context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
}
def startWebUi() {
@@ -111,6 +114,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
}
+ case Heartbeat(workerId) => {
+ idToWorker.get(workerId) match {
+ case Some(workerInfo) =>
+ workerInfo.lastHeartbeat = System.currentTimeMillis()
+ case None =>
+ logWarning("Got heartbeat from unregistered worker " + workerId)
+ }
+ }
+
case Terminated(actor) => {
// The disconnected actor could've been either a worker or a job; remove whichever of
// those we have an entry for in the corresponding actor hashmap
@@ -131,7 +143,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
case RequestMasterState => {
- sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray)
+ sender ! MasterState(ip, port, workers.toArray, jobs.toArray, completedJobs.toArray)
}
}
@@ -219,8 +231,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
- exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
- exec.job.executors -= exec.id
+ logInfo("Telling job of lost executor: " + exec.id)
+ exec.job.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
+ exec.job.removeExecutor(exec)
}
}
@@ -259,6 +272,18 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
nextJobNumber += 1
jobId
}
+
+ /** Check for, and remove, any timed-out workers */
+ def timeOutDeadWorkers() {
+ // Copy the workers into an array so we don't modify the hashset while iterating through it
+ val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
+ val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
+ for (worker <- toRemove) {
+ logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+ worker.id, WORKER_TIMEOUT))
+ removeWorker(worker)
+ }
+ }
}
private[spark] object Master {
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 5a7f5fef8a..2e467007a0 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -18,6 +18,8 @@ private[spark] class WorkerInfo(
var coresUsed = 0
var memoryUsed = 0
+ var lastHeartbeat = System.currentTimeMillis()
+
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 2219dd6262..924935a5fd 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -1,20 +1,18 @@
package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import akka.actor.{ActorRef, Props, Actor, ActorSystem}
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.util.duration._
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
-import akka.remote.RemoteClientLifeCycleEvent
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.text.SimpleDateFormat
import java.util.Date
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
import spark.deploy.RegisterWorker
import spark.deploy.LaunchExecutor
import spark.deploy.RegisterWorkerFailed
import spark.deploy.master.Master
-import akka.actor.Terminated
import java.io.File
private[spark] class Worker(
@@ -29,6 +27,9 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
+ // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
+ val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+
var master: ActorRef = null
var masterWebUiUrl : String = ""
val workerId = generateWorkerId()
@@ -100,6 +101,9 @@ private[spark] class Worker(
case RegisteredWorker(url) =>
masterWebUiUrl = url
logInfo("Successfully registered with master")
+ context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
+ master ! Heartbeat(workerId)
+ }
case RegisterWorkerFailed(message) =>
logError("Worker registration failed: " + message)
@@ -143,7 +147,7 @@ private[spark] class Worker(
masterDisconnected()
case RequestWorkerState => {
- sender ! WorkerState(ip + ":" + port, workerId, executors.values.toList,
+ sender ! WorkerState(ip, port, workerId, executors.values.toList,
finishedExecutors.values.toList, masterUrl, cores, memory,
coresUsed, memoryUsed, masterWebUiUrl)
}
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index e45288ff53..224c126fdd 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -4,16 +4,15 @@ import java.nio.ByteBuffer
import spark.Logging
import spark.TaskState.TaskState
import spark.util.AkkaUtils
-import akka.actor.{ActorRef, Actor, Props}
+import akka.actor.{ActorRef, Actor, Props, Terminated}
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.cluster._
import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
-
private[spark] class StandaloneExecutorBackend(
executor: Executor,
driverUrl: String,
@@ -27,17 +26,11 @@ private[spark] class StandaloneExecutorBackend(
var driver: ActorRef = null
override def preStart() {
- try {
- logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
- driver ! RegisterExecutor(executorId, hostname, cores)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver", e)
- System.exit(1)
- }
+ logInfo("Connecting to driver: " + driverUrl)
+ driver = context.actorFor(driverUrl)
+ driver ! RegisterExecutor(executorId, hostname, cores)
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(driver) // Doesn't work with remote actors, but useful for testing
}
override def receive = {
@@ -52,6 +45,10 @@ private[spark] class StandaloneExecutorBackend(
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+
+ case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ logError("Driver terminated or disconnected! Shutting down.")
+ System.exit(1)
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index c7f226044d..b6ec664d7e 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -66,31 +66,28 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id)
- val thisInstance = this
val selectorThread = new Thread("connection-manager-thread") {
- override def run() {
- thisInstance.run()
- }
+ override def run() = ConnectionManager.this.run()
}
selectorThread.setDaemon(true)
selectorThread.start()
- def run() {
+ private def run() {
try {
while(!selectorThread.isInterrupted) {
- for( (connectionManagerId, sendingConnection) <- connectionRequests) {
+ for ((connectionManagerId, sendingConnection) <- connectionRequests) {
sendingConnection.connect()
addConnection(sendingConnection)
connectionRequests -= connectionManagerId
}
sendMessageRequests.synchronized {
- while(!sendMessageRequests.isEmpty) {
+ while (!sendMessageRequests.isEmpty) {
val (message, connection) = sendMessageRequests.dequeue
connection.send(message)
}
}
- while(!keyInterestChangeRequests.isEmpty) {
+ while (!keyInterestChangeRequests.isEmpty) {
val (key, ops) = keyInterestChangeRequests.dequeue
val connection = connectionsByKey(key)
val lastOps = key.interestOps()
@@ -126,14 +123,11 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
if (key.isValid) {
if (key.isAcceptable) {
acceptConnection(key)
- } else
- if (key.isConnectable) {
+ } else if (key.isConnectable) {
connectionsByKey(key).asInstanceOf[SendingConnection].finishConnect()
- } else
- if (key.isReadable) {
+ } else if (key.isReadable) {
connectionsByKey(key).read()
- } else
- if (key.isWritable) {
+ } else if (key.isWritable) {
connectionsByKey(key).write()
}
}
@@ -144,7 +138,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
}
- def acceptConnection(key: SelectionKey) {
+ private def acceptConnection(key: SelectionKey) {
val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
val newChannel = serverChannel.accept()
val newConnection = new ReceivingConnection(newChannel, selector)
@@ -154,7 +148,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
logInfo("Accepted connection from [" + newConnection.remoteAddress.getAddress + "]")
}
- def addConnection(connection: Connection) {
+ private def addConnection(connection: Connection) {
connectionsByKey += ((connection.key, connection))
if (connection.isInstanceOf[SendingConnection]) {
val sendingConnection = connection.asInstanceOf[SendingConnection]
@@ -165,7 +159,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
connection.onClose(removeConnection)
}
- def removeConnection(connection: Connection) {
+ private def removeConnection(connection: Connection) {
connectionsByKey -= connection.key
if (connection.isInstanceOf[SendingConnection]) {
val sendingConnection = connection.asInstanceOf[SendingConnection]
@@ -222,16 +216,16 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
}
- def handleConnectionError(connection: Connection, e: Exception) {
+ private def handleConnectionError(connection: Connection, e: Exception) {
logInfo("Handling connection error on connection to " + connection.remoteConnectionManagerId)
removeConnection(connection)
}
- def changeConnectionKeyInterest(connection: Connection, ops: Int) {
+ private def changeConnectionKeyInterest(connection: Connection, ops: Int) {
keyInterestChangeRequests += ((connection.key, ops))
}
- def receiveMessage(connection: Connection, message: Message) {
+ private def receiveMessage(connection: Connection, message: Message) {
val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress)
logDebug("Received [" + message + "] from [" + connectionManagerId + "]")
val runnable = new Runnable() {
@@ -351,7 +345,6 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private[spark] object ConnectionManager {
def main(args: Array[String]) {
-
val manager = new ConnectionManager(9999)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index 2c022f88e0..17989c5ce5 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -11,10 +11,6 @@ private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) {
- @transient var splits_ : Array[Split] = (0 until blockIds.size).map(i => {
- new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
- }).toArray
-
@transient lazy val locations_ = {
val blockManager = SparkEnv.get.blockManager
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
@@ -22,7 +18,10 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
HashMap(blockIds.zip(locations):_*)
}
- override def getSplits = splits_
+ override def getSplits: Array[Split] = (0 until blockIds.size).map(i => {
+ new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
+ }).toArray
+
override def compute(split: Split, context: TaskContext): Iterator[T] = {
val blockManager = SparkEnv.get.blockManager
@@ -34,11 +33,8 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
}
}
- override def getPreferredLocations(split: Split) =
+ override def getPreferredLocations(split: Split): Seq[String] =
locations_(split.asInstanceOf[BlockRDDSplit].blockId)
- override def clearDependencies() {
- splits_ = null
- }
}
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 0f9ca06531..41cbbd0093 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -45,7 +45,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
array
}
- override def getPreferredLocations(split: Split) = {
+ override def getPreferredLocations(split: Split): Seq[String] = {
val currSplit = split.asInstanceOf[CartesianSplit]
rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
}
@@ -66,6 +66,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
)
override def clearDependencies() {
+ super.clearDependencies()
rdd1 = null
rdd2 = null
}
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 96b593ba7c..3558d4673f 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -20,7 +20,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
- @transient val splits_ : Array[Split] = {
+ override def getSplits: Array[Split] = {
val dirContents = fs.listStatus(new Path(checkpointPath))
val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
val numSplits = splitFiles.size
@@ -34,8 +34,6 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
checkpointData = Some(new RDDCheckpointData[T](this))
checkpointData.get.cpFile = Some(checkpointPath)
- override def getSplits = splits_
-
override def getPreferredLocations(split: Split): Seq[String] = {
val status = fs.getFileStatus(new Path(checkpointPath))
val locations = fs.getFileBlockLocations(status, 0, status.getLen)
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 8fafd27bb6..0a1e2cbee0 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -43,47 +43,44 @@ private[spark] class CoGroupAggregator
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging {
- val aggr = new CoGroupAggregator
+ private val aggr = new CoGroupAggregator
- @transient var deps_ = {
- val deps = new ArrayBuffer[Dependency[_]]
- for ((rdd, index) <- rdds.zipWithIndex) {
+ override def getDependencies: Seq[Dependency[_]] = {
+ rdds.map { rdd =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
- deps += new OneToOneDependency(rdd)
+ new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
- deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+ new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
}
}
- deps.toList
}
- override def getDependencies = deps_
-
- @transient var splits_ : Array[Split] = {
+ override def getSplits: Array[Split] = {
val array = new Array[Split](part.numPartitions)
for (i <- 0 until array.size) {
- array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) =>
+ // Each CoGroupSplit will have a dependency per contributing RDD
+ array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) =>
+ // Assume each RDD contributed a single dependency, and get it
dependencies(j) match {
case s: ShuffleDependency[_, _] =>
- new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep
+ new ShuffleCoGroupSplitDep(s.shuffleId)
case _ =>
- new NarrowCoGroupSplitDep(r, i, r.splits(i)): CoGroupSplitDep
+ new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
}
}.toList)
}
array
}
- override def getSplits = splits_
-
override val partitioner = Some(part)
override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
val split = s.asInstanceOf[CoGroupSplit]
val numRdds = split.deps.size
+ // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
val seq = map.get(k)
@@ -96,7 +93,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
}
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
- case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => {
+ case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
for ((k, v) <- rdd.iterator(itsSplit, context)) {
getSeq(k.asInstanceOf[K])(depNum) += v
@@ -104,21 +101,17 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
- def mergePair(pair: (K, Seq[Any])) {
- val mySeq = getSeq(pair._1)
- for (v <- pair._2)
- mySeq(depNum) += v
- }
val fetcher = SparkEnv.get.shuffleFetcher
- fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair)
+ for ((k, vs) <- fetcher.fetch[K, Seq[Any]](shuffleId, split.index)) {
+ getSeq(k)(depNum) ++= vs
+ }
}
}
JavaConversions.mapAsScalaMap(map).iterator
}
override def clearDependencies() {
- deps_ = null
- splits_ = null
+ super.clearDependencies()
rdds = null
}
}
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 4c57434b65..fcd26da43a 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -50,14 +50,15 @@ class CoalescedRDD[T: ClassManifest](
}
}
- override def getDependencies: Seq[Dependency[_]] = List(
- new NarrowDependency(prev) {
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices
- }
- )
+ })
+ }
override def clearDependencies() {
+ super.clearDependencies()
prev = null
}
}
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
index 6dbe235bd9..93e398ea2b 100644
--- a/core/src/main/scala/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala
@@ -7,7 +7,7 @@ private[spark] class FilteredRDD[T: ClassManifest](
f: T => Boolean)
extends RDD[T](prev) {
- override def getSplits = firstParent[T].splits
+ override def getSplits: Array[Split] = firstParent[T].splits
override val partitioner = prev.partitioner // Since filter cannot change a partition's keys
diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
index 1b604c66e2..8c2a610593 100644
--- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
@@ -9,7 +9,7 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
f: T => TraversableOnce[U])
extends RDD[U](prev) {
- override def getSplits = firstParent[T].splits
+ override def getSplits: Array[Split] = firstParent[T].splits
override def compute(split: Split, context: TaskContext) =
firstParent[T].iterator(split, context).flatMap(f)
diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala
index 051bffed19..70b9b4e34e 100644
--- a/core/src/main/scala/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala
@@ -5,7 +5,7 @@ import spark.{RDD, Split, TaskContext}
private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
extends RDD[Array[T]](prev) {
- override def getSplits = firstParent[T].splits
+ override def getSplits: Array[Split] = firstParent[T].splits
override def compute(split: Split, context: TaskContext) =
Array(firstParent[T].iterator(split, context).toArray).iterator
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index f547f53812..854993737b 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -45,10 +45,9 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
- val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
- @transient
- val splits_ : Array[Split] = {
+ override def getSplits: Array[Split] = {
val inputFormat = createInputFormat(conf)
val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Split](inputSplits.size)
@@ -63,8 +62,6 @@ class HadoopRDD[K, V](
.asInstanceOf[InputFormat[K, V]]
}
- override def getSplits = splits_
-
override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
@@ -109,7 +106,7 @@ class HadoopRDD[K, V](
}
}
- override def getPreferredLocations(split: Split) = {
+ override def getPreferredLocations(split: Split): Seq[String] = {
// TODO: Filtering out "localhost" in case of file:// URLs
val hadoopSplit = split.asInstanceOf[HadoopSplit]
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
index 073f7d7d2a..7b0b4525c7 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -13,7 +13,7 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
override val partitioner =
if (preservesPartitioning) firstParent[T].partitioner else None
- override def getSplits = firstParent[T].splits
+ override def getSplits: Array[Split] = firstParent[T].splits
override def compute(split: Split, context: TaskContext) =
f(firstParent[T].iterator(split, context))
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
index 2ddc3d01b6..c6dc1080a9 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
@@ -15,7 +15,7 @@ class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
preservesPartitioning: Boolean
) extends RDD[U](prev) {
- override def getSplits = firstParent[T].splits
+ override def getSplits: Array[Split] = firstParent[T].splits
override val partitioner = if (preservesPartitioning) prev.partitioner else None
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
index 5466c9c657..6074f411e3 100644
--- a/core/src/main/scala/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/MappedRDD.scala
@@ -6,7 +6,7 @@ private[spark]
class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
extends RDD[U](prev) {
- override def getSplits = firstParent[T].splits
+ override def getSplits: Array[Split] = firstParent[T].splits
override def compute(split: Split, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index c3b155fcbd..345ae79d74 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -29,7 +29,7 @@ class NewHadoopRDD[K, V](
with HadoopMapReduceUtil {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
private val jobtrackerId: String = {
@@ -39,7 +39,7 @@ class NewHadoopRDD[K, V](
@transient private val jobId = new JobID(jobtrackerId, id)
- @transient private val splits_ : Array[Split] = {
+ override def getSplits: Array[Split] = {
val inputFormat = inputFormatClass.newInstance
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
@@ -50,8 +50,6 @@ class NewHadoopRDD[K, V](
result
}
- override def getSplits = splits_
-
override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopSplit]
val conf = confBroadcast.value.value
@@ -85,7 +83,7 @@ class NewHadoopRDD[K, V](
}
}
- override def getPreferredLocations(split: Split) = {
+ override def getPreferredLocations(split: Split): Seq[String] = {
val theSplit = split.asInstanceOf[NewHadoopSplit]
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
}
diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
index a50ce75171..d1553181c1 100644
--- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
@@ -37,6 +37,6 @@ class PartitionPruningRDD[T: ClassManifest](
override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(
split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context)
- override protected def getSplits =
+ override protected def getSplits: Array[Split] =
getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
}
diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala
index 6631f83510..56032a8659 100644
--- a/core/src/main/scala/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/spark/rdd/PipedRDD.scala
@@ -27,7 +27,7 @@ class PipedRDD[T: ClassManifest](
// using a standard StringTokenizer (i.e. by spaces)
def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command))
- override def getSplits = firstParent[T].splits
+ override def getSplits: Array[Split] = firstParent[T].splits
override def compute(split: Split, context: TaskContext): Iterator[String] = {
val pb = new ProcessBuilder(command)
diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala
index e24ad23b21..f2a144e2e0 100644
--- a/core/src/main/scala/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/spark/rdd/SampledRDD.scala
@@ -19,17 +19,15 @@ class SampledRDD[T: ClassManifest](
seed: Int)
extends RDD[T](prev) {
- @transient var splits_ : Array[Split] = {
+ override def getSplits: Array[Split] = {
val rg = new Random(seed)
firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt))
}
- override def getSplits = splits_
-
- override def getPreferredLocations(split: Split) =
+ override def getPreferredLocations(split: Split): Seq[String] =
firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
- override def compute(splitIn: Split, context: TaskContext) = {
+ override def compute(splitIn: Split, context: TaskContext): Iterator[T] = {
val split = splitIn.asInstanceOf[SampledRDDSplit]
if (withReplacement) {
// For large datasets, the expected number of occurrences of each element in a sample with
@@ -48,8 +46,4 @@ class SampledRDD[T: ClassManifest](
firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac))
}
}
-
- override def clearDependencies() {
- splits_ = null
- }
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index d396478673..bf69b5150b 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -22,7 +22,9 @@ class ShuffledRDD[K, V](
override val partitioner = Some(part)
- override def getSplits = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
+ override def getSplits: Array[Split] = {
+ Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
+ }
override def compute(split: Split, context: TaskContext): Iterator[(K, V)] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index 26a2d511f2..ebc0068228 100644
--- a/core/src/main/scala/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -53,8 +53,4 @@ class UnionRDD[T: ClassManifest](
override def getPreferredLocations(s: Split): Seq[String] =
s.asInstanceOf[UnionSplit[T]].preferredLocations()
-
- override def clearDependencies() {
- rdds = null
- }
}
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index e5df6d8c72..1ce70268bb 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -29,8 +29,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
sc: SparkContext,
var rdd1: RDD[T],
var rdd2: RDD[U])
- extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2)))
- with Serializable {
+ extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) {
override def getSplits: Array[Split] = {
if (rdd1.splits.size != rdd2.splits.size) {
@@ -54,6 +53,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def clearDependencies() {
+ super.clearDependencies()
rdd1 = null
rdd2 = null
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
index bba7de6a65..8bf838209f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
@@ -12,10 +12,10 @@ class ExecutorLossReason(val message: String) {
private[spark]
case class ExecutorExited(val exitCode: Int)
- extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+ extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
}
private[spark]
case class SlaveLost(_message: String = "Slave lost")
- extends ExecutorLossReason(_message) {
+ extends ExecutorLossReason(_message) {
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 59ff8bcb90..e77355c6cd 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -67,6 +67,6 @@ private[spark] class SparkDeploySchedulerBackend(
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(executorId, message))
- scheduler.executorLost(executorId, reason)
+ removeExecutor(executorId, reason.toString)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index da7dcf4b6b..d766067824 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -37,3 +37,6 @@ object StatusUpdate {
// Internal messages in driver
private[spark] case object ReviveOffers extends StandaloneClusterMessage
private[spark] case object StopDriver extends StandaloneClusterMessage
+
+private[spark] case class RemoveExecutor(executorId: String, reason: String)
+ extends StandaloneClusterMessage
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 082022be1c..d606432572 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -68,6 +68,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
sender ! true
context.stop(self)
+ case RemoveExecutor(executorId, reason) =>
+ removeExecutor(executorId, reason)
+ sender ! true
+
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
@@ -100,16 +104,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: String) {
- logInfo("Slave " + executorId + " disconnected, so removing it")
- val numCores = freeCores(executorId)
- actorToExecutorId -= executorActor(executorId)
- addressToExecutorId -= executorAddress(executorId)
- executorActor -= executorId
- executorHost -= executorId
- freeCores -= executorId
- executorHost -= executorId
- totalCoreCount.addAndGet(-numCores)
- scheduler.executorLost(executorId, SlaveLost(reason))
+ if (executorActor.contains(executorId)) {
+ logInfo("Executor " + executorId + " disconnected, so removing it")
+ val numCores = freeCores(executorId)
+ actorToExecutorId -= executorActor(executorId)
+ addressToExecutorId -= executorAddress(executorId)
+ executorActor -= executorId
+ executorHost -= executorId
+ freeCores -= executorId
+ executorHost -= executorId
+ totalCoreCount.addAndGet(-numCores)
+ scheduler.executorLost(executorId, SlaveLost(reason))
+ }
}
}
@@ -139,7 +145,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
} catch {
case e: Exception =>
- throw new SparkException("Error stopping standalone scheduler's master actor", e)
+ throw new SparkException("Error stopping standalone scheduler's driver actor", e)
}
}
@@ -148,6 +154,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
+
+ // Called by subclasses when notified of a lost worker
+ def removeExecutor(executorId: String, reason: String) {
+ try {
+ val timeout = 5.seconds
+ val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
+ Await.result(future, timeout)
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error notifying standalone scheduler's driver actor", e)
+ }
+ }
}
private[spark] object StandaloneSchedulerBackend {
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index b481ec0a72..7caf06e917 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -239,7 +239,11 @@ private[spark] class CoarseMesosSchedulerBackend(
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
+ if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
+ // Note that the slave ID corresponds to the executor ID on that slave
+ slaveIdsWithExecutors -= slaveId.getValue
+ removeExecutor(slaveId.getValue, "Mesos slave lost")
+ }
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 9893e9625d..2e7db60841 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -585,7 +585,7 @@ class BlockManager(
resultsGotten += 1
val result = results.take()
bytesInFlight -= result.size
- if (!fetchRequests.isEmpty &&
+ while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html
index 285645c389..cb1651c7e1 100644
--- a/core/src/main/twirl/spark/deploy/master/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/index.scala.html
@@ -2,13 +2,13 @@
@import spark.deploy.master._
@import spark.Utils
-@spark.common.html.layout(title = "Spark Master on " + state.uri) {
-
+@spark.common.html.layout(title = "Spark Master on " + state.host) {
+
<!-- Cluster Details -->
<div class="row">
<div class="span12">
<ul class="unstyled">
- <li><strong>URL:</strong> spark://@(state.uri)</li>
+ <li><strong>URL:</strong> @(state.uri)</li>
<li><strong>Workers:</strong> @state.workers.size </li>
<li><strong>Cores:</strong> @{state.workers.map(_.cores).sum} Total,
@{state.workers.map(_.coresUsed).sum} Used</li>
diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html
index 1d703dae58..c39f769a73 100644
--- a/core/src/main/twirl/spark/deploy/worker/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html
@@ -1,8 +1,8 @@
@(worker: spark.deploy.WorkerState)
@import spark.Utils
-@spark.common.html.layout(title = "Spark Worker on " + worker.uri) {
-
+@spark.common.html.layout(title = "Spark Worker on " + worker.host) {
+
<!-- Worker Details -->
<div class="row">
<div class="span12">
@@ -10,12 +10,12 @@
<li><strong>ID:</strong> @worker.workerId</li>
<li><strong>
Master URL:</strong> @worker.masterUrl
- (WebUI at <a href="@worker.masterWebUiUrl">@worker.masterWebUiUrl</a>)
</li>
<li><strong>Cores:</strong> @worker.cores (@worker.coresUsed Used)</li>
<li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(worker.memory)}
(@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
</ul>
+ <p><a href="@worker.masterWebUiUrl">Back to Master</a></p>
</div>
</div>
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 0b74607fb8..0d08fd2396 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -114,12 +114,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
test("CoalescedRDD") {
- testCheckpointing(new CoalescedRDD(_, 2))
+ testCheckpointing(_.coalesce(2))
// Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
// Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
// so only the RDD will reduce in serialized size, not the splits.
- testParentCheckpointing(new CoalescedRDD(_, 2), true, false)
+ testParentCheckpointing(_.coalesce(2), true, false)
// Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after
// the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala
index 342610e1dd..5e84b3a66a 100644
--- a/core/src/test/scala/spark/DriverSuite.scala
+++ b/core/src/test/scala/spark/DriverSuite.scala
@@ -9,10 +9,11 @@ import org.scalatest.time.SpanSugar._
class DriverSuite extends FunSuite with Timeouts {
test("driver should exit after finishing") {
+ assert(System.getenv("SPARK_HOME") != null)
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
- failAfter(10 seconds) {
+ failAfter(30 seconds) {
Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master),
new File(System.getenv("SPARK_HOME")))
}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 934e4c2f67..9ffe7c5f99 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -696,4 +696,28 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}
+
+ @Test
+ public void mapOnPairRDD() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+ JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+ new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+ return new Tuple2<Integer, Integer>(in._2(), in._1());
+ }
+ });
+ Assert.assertEquals(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(0, 2),
+ new Tuple2<Integer, Integer>(1, 3),
+ new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+
+ }
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index fe7deb10d6..ffa866de75 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -122,7 +122,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
- val coalesced1 = new CoalescedRDD(data, 2)
+ val coalesced1 = data.coalesce(2)
assert(coalesced1.collect().toList === (1 to 10).toList)
assert(coalesced1.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
@@ -133,19 +133,19 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList ===
List(5, 6, 7, 8, 9))
- val coalesced2 = new CoalescedRDD(data, 3)
+ val coalesced2 = data.coalesce(3)
assert(coalesced2.collect().toList === (1 to 10).toList)
assert(coalesced2.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
- val coalesced3 = new CoalescedRDD(data, 10)
+ val coalesced3 = data.coalesce(10)
assert(coalesced3.collect().toList === (1 to 10).toList)
assert(coalesced3.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
- val coalesced4 = new CoalescedRDD(data, 20)
+ val coalesced4 = data.coalesce(20)
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
diff --git a/docs/configuration.md b/docs/configuration.md
index a7054b4321..f1ca77aa78 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -198,6 +198,14 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.worker.timeout</td>
+ <td>60</td>
+ <td>
+ Number of seconds after which the standalone deploy master considers a worker lost if it
+ receives no heartbeats.
+ </td>
+</tr>
+<tr>
<td>spark.akka.frameSize</td>
<td>10</td>
<td>
@@ -218,7 +226,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.akka.timeout</td>
<td>20</td>
<td>
- Communication timeout between Spark nodes.
+ Communication timeout between Spark nodes, in seconds.
</td>
</tr>
<tr>
diff --git a/docs/contributing-to-spark.md b/docs/contributing-to-spark.md
index c6e01c62d8..14d0dc856b 100644
--- a/docs/contributing-to-spark.md
+++ b/docs/contributing-to-spark.md
@@ -15,7 +15,7 @@ The Spark team welcomes contributions in the form of GitHub pull requests. Here
But first, make sure that you have [configured a spark-env.sh](configuration.html) with at least
`SCALA_HOME`, as some of the tests try to spawn subprocesses using this.
- Add new unit tests for your code. We use [ScalaTest](http://www.scalatest.org/) for testing. Just add a new Suite in `core/src/test`, or methods to an existing Suite.
-- If you'd like to report a bug but don't have time to fix it, you can still post it to our [issues page](https://github.com/mesos/spark/issues), or email the [mailing list](http://www.spark-project.org/mailing-lists.html).
+- If you'd like to report a bug but don't have time to fix it, you can still post it to our [issue tracker](https://spark-project.atlassian.net), or email the [mailing list](http://www.spark-project.org/mailing-lists.html).
# Licensing of Contributions
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 301b330a79..b98718a553 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -203,7 +203,7 @@ A complete list of transformations is available in the [RDD API doc](api/core/in
<tr><th>Action</th><th>Meaning</th></tr>
<tr>
<td> <b>reduce</b>(<i>func</i>) </td>
- <td> Aggregate the elements of the dataset using a function <i>func</i> (which takes two arguments and returns one). The function should be associative so that it can be computed correctly in parallel. </td>
+ <td> Aggregate the elements of the dataset using a function <i>func</i> (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. </td>
</tr>
<tr>
<td> <b>collect</b>() </td>
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index bf296221b8..3986c0c79d 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -115,6 +115,14 @@ You can optionally configure the cluster further by setting environment variable
<td><code>SPARK_WORKER_WEBUI_PORT</code></td>
<td>Port for the worker web UI (default: 8081)</td>
</tr>
+ <tr>
+ <td><code>SPARK_DAEMON_MEMORY</code></td>
+ <td>Memory to allocate to the Spark master and worker daemons themselves (default: 512m)</td>
+ </tr>
+ <tr>
+ <td><code>SPARK_DAEMON_JAVA_OPTS</code></td>
+ <td>JVM options for the Spark master and worker daemons themselves (default: none)</td>
+ </tr>
</table>
diff --git a/docs/tuning.md b/docs/tuning.md
index 9aaa53cd65..738c530458 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -233,7 +233,7 @@ number of cores in your clusters.
## Broadcasting Large Variables
-Using the [broadcast functionality](scala-programming-guide#broadcast-variables)
+Using the [broadcast functionality](scala-programming-guide.html#broadcast-variables)
available in `SparkContext` can greatly reduce the size of each serialized task, and the cost
of launching a job over a cluster. If your tasks use any large object from the driver program
inside of them (e.g. a static lookup table), consider turning it into a broadcast variable.
diff --git a/examples/src/main/scala/spark/examples/LogQuery.scala b/examples/src/main/scala/spark/examples/LogQuery.scala
new file mode 100644
index 0000000000..5330b8da94
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LogQuery.scala
@@ -0,0 +1,66 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+/**
+ * Executes a roll up-style query against Apache logs.
+ */
+object LogQuery {
+ val exampleApacheLogs = List(
+ """10.10.10.10 - "FRED" [18/Jan/2013:17:56:07 +1100] "GET http://images.com/2013/Generic.jpg
+ | HTTP/1.1" 304 315 "http://referall.com/" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
+ | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
+ | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
+ | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 ""
+ | 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.replace("\n", ""),
+ """10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg
+ | HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
+ | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
+ | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
+ | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 ""
+ | 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.replace("\n", "")
+ )
+
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: LogQuery <master> [logFile]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "Log Query")
+
+ val dataSet =
+ if (args.length == 2) sc.textFile(args(1))
+ else sc.parallelize(exampleApacheLogs)
+
+ val apacheLogRegex =
+ """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
+
+ /** Tracks the total query count and number of aggregate bytes for a particular group. */
+ class Stats(val count: Int, val numBytes: Int) extends Serializable {
+ def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
+ override def toString = "bytes=%s\tn=%s".format(numBytes, count)
+ }
+
+ def extractKey(line: String): (String, String, String) = {
+ apacheLogRegex.findFirstIn(line) match {
+ case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
+ if (user != "\"-\"") (ip, user, query)
+ else (null, null, null)
+ case _ => (null, null, null)
+ }
+ }
+
+ def extractStats(line: String): Stats = {
+ apacheLogRegex.findFirstIn(line) match {
+ case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
+ new Stats(1, bytes.toInt)
+ case _ => new Stats(1, 0)
+ }
+ }
+
+ dataSet.map(line => (extractKey(line), extractStats(line)))
+ .reduceByKey((a, b) => a.merge(b))
+ .collect().foreach{
+ case (user, query) => println("%s\t%s".format(user, query))}
+ }
+}
diff --git a/pyspark b/pyspark
index ab7f4f50c0..d662e90287 100755
--- a/pyspark
+++ b/pyspark
@@ -36,4 +36,9 @@ if [[ "$SPARK_LAUNCH_WITH_SCALA" != "0" ]] ; then
export SPARK_LAUNCH_WITH_SCALA=1
fi
-exec "$PYSPARK_PYTHON" "$@"
+if [[ "$IPYTHON" = "1" ]] ; then
+ export PYSPARK_PYTHON="ipython"
+ exec "$PYSPARK_PYTHON" -i -c "%run $PYTHONSTARTUP"
+else
+ exec "$PYSPARK_PYTHON" "$@"
+fi
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 4cda6cf661..6b6ab6abd9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -274,8 +274,8 @@ class RDD(object):
def reduce(self, f):
"""
- Reduces the elements of this RDD using the specified associative binary
- operator.
+ Reduces the elements of this RDD using the specified commutative and
+ associative binary operator.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
diff --git a/run b/run
index a094629449..82b1da005a 100755
--- a/run
+++ b/run
@@ -13,6 +13,18 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
+if [ -z "$1" ]; then
+ echo "Usage: run <spark-class> [<args>]" >&2
+ exit 1
+fi
+
+# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable
+# values for that; it doesn't need a lot
+if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
+ SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
+ SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default
+fi
+
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
if [ `command -v scala` ]; then
RUNNER="scala"
diff --git a/run2.cmd b/run2.cmd
index 67f1e465e4..c913a5195e 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -11,9 +11,22 @@ set SPARK_HOME=%FWDIR%
rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+rem Test that an argument was given
+if not "x%1"=="x" goto arg_given
+ echo Usage: run ^<spark-class^> [^<args^>]
+ goto exit
+:arg_given
+
+set RUNNING_DAEMON=0
+if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1
+if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1
+if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m
+if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY%
+if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
+
rem Check that SCALA_HOME has been specified
if not "x%SCALA_HOME%"=="x" goto scala_exists
- echo "SCALA_HOME is not set"
+ echo SCALA_HOME is not set
goto exit
:scala_exists
@@ -40,10 +53,10 @@ rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources
set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes
-for /R "%FWDIR%\lib_managed\jars" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
-for /R "%FWDIR%\lib_managed\bundles" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
-for /R "%REPL_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
-for /R "%PYSPARK_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\*
+set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\*
+set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
+set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
rem Figure out whether to run our class with java or with the scala launcher.
diff --git a/sbt/sbt.cmd b/sbt/sbt.cmd
index 6b289ab447..ce3ae70174 100644
--- a/sbt/sbt.cmd
+++ b/sbt/sbt.cmd
@@ -2,4 +2,4 @@
set EXTRA_ARGS=
if not "%MESOS_HOME%x"=="x" set EXTRA_ARGS=-Djava.library.path=%MESOS_HOME%\lib\java
set SPARK_HOME=%~dp0..
-java -Xmx1200M -XX:MaxPermSize=200m %EXTRA_ARGS% -jar %SPARK_HOME%\sbt\sbt-launch-*.jar "%*"
+java -Xmx1200M -XX:MaxPermSize=200m %EXTRA_ARGS% -jar %SPARK_HOME%\sbt\sbt-launch-0.11.3-2.jar "%*"
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index b93cb7865a..ec546c8190 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -59,8 +59,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -78,10 +78,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = {
+ def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -100,8 +100,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
- def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V])
- : JavaPairDStream[K, V] = {
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+ : JavaPairDStream[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ef10c091ca..eb2495e3ac 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -8,11 +8,11 @@ import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import spark.Partitioner
+import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.JavaPairRDD
+import spark.api.java.{JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
@@ -81,6 +81,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] =
+ transformFunc.call(new JavaPairRDD[K, V](in)).rdd
+ dstream.transform(scalaTransform(_))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(new JavaPairRDD[K, V](in), time).rdd
+ dstream.transform(scalaTransform(_, _))
+ }
+
// =======================================================================
// Methods only for PairDStream's
// =======================================================================
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f82e6a37cc..e7f446a49b 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -34,6 +34,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(master, frameworkName, batchDuration))
/**
+ * Creates a StreamingContext.
+ * @param sparkContext The underlying JavaSparkContext to use
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(sparkContext: JavaSparkContext, batchDuration: Duration) =
+ this(new StreamingContext(sparkContext.sc, batchDuration))
+
+ /**
* Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 79d6093429..7b385f609d 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -11,6 +11,7 @@ import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
+import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
@@ -507,6 +508,141 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 1)));
@Test
+ public void testPairMap() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.map(
+ new PairFunction<Tuple2<String, Integer>, Integer, String>() {
+ @Override
+ public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+ return in.swap();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMapPartitions() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions(
+ new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
+ LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ while (in.hasNext()) {
+ Tuple2<String, Integer> next = in.next();
+ out.add(next.swap());
+ }
+ return out;
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMap2() { // Maps pair -> single
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1, 3, 4, 1),
+ Arrays.asList(5, 5, 3, 1));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> reversed = pairStream.map(
+ new Function<Tuple2<String, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<String, Integer> in) throws Exception {
+ return in._2();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+ List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
+ new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
+ List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ for (Character s : in._1().toCharArray()) {
+ out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+ }
+ return out;
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -570,7 +706,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
new Function<Integer, Integer>() {
- @Override
+ @Override
public Integer call(Integer i) throws Exception {
return i;
}
@@ -668,7 +804,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){
+ new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
@@ -680,7 +816,7 @@ public class JavaAPISuite implements Serializable {
}
return Optional.of(out);
}
- });
+ });
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -738,6 +874,50 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testPairTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+ new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
+ @Override
+ public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ return in.sortByKey();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(sorted);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;