aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java33
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala73
-rw-r--r--docs/scala-programming-guide.md2
-rw-r--r--project/SparkBuild.scala7
-rw-r--r--python/pyspark/context.py3
-rw-r--r--python/pyspark/rdd.py10
-rwxr-xr-xrun-example10
-rwxr-xr-xsbt/sbt21
-rwxr-xr-xspark-class10
-rwxr-xr-xspark-shell19
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala88
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala55
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/Job.scala)24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala)52
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala108
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala68
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala)3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala75
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala81
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala26
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala71
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala32
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala14
63 files changed, 1048 insertions, 336 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 10fae5af9f..ccffcc356c 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -29,8 +29,7 @@ import akka.pattern.ask
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}
-
+import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
@@ -53,7 +52,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {
- private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ private val timeout = AkkaUtils.askTimeout
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index c47657f512..037cd1c774 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -125,6 +125,8 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
+
+ override def toString = rdd.toString
}
object JavaRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 9e912d3adb..f344804b4c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -245,6 +245,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * Return an array that contains all of the elements in a specific partition of this RDD.
+ */
+ def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
+ // This is useful for implementing `take` from other language frontends
+ // like Python where the data is serialized.
+ import scala.collection.JavaConversions._
+ val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
+ res.map(x => new java.util.ArrayList(x.toSeq)).toArray
+ }
+
+ /**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index a659cc06c2..ca42c76928 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -235,10 +235,6 @@ private[spark] object PythonRDD {
file.close()
}
- def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
- implicit val cm : ClassTag[T] = rdd.elementClassTag
- rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
- }
}
private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 4d95efa73a..953755e40d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -23,14 +23,14 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor._
-import akka.pattern.AskTimeoutException
import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.AkkaUtils
/**
@@ -178,7 +178,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c627dd3806..eebd0794b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -18,19 +18,16 @@
package org.apache.spark.deploy.master
import java.text.SimpleDateFormat
-import java.util.concurrent.TimeUnit
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
-import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
-import akka.remote._
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -38,7 +35,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@@ -64,8 +61,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
- var firstApp: Option[ApplicationInfo] = None
-
Utils.checkHost(host, "Expected hostname")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
@@ -444,14 +439,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
idToApp(app.id) = app
actorToApp(app.driver) = app
addressToApp(appAddress) = app
- if (firstApp == None) {
- firstApp = Some(app)
- }
- // TODO: What is firstApp?? Can we remove it?
- val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
- if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) {
- logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
- }
waitingApps += app
}
@@ -537,12 +524,10 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
- val timeoutDuration: FiniteDuration = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
- implicit val timeout = Timeout(timeoutDuration)
- val respFuture = actor ? RequestWebUIPort // ask pattern
- val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
+ val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
+ val timeout = AkkaUtils.askTimeout
+ val respFuture = actor.ask(RequestWebUIPort)(timeout)
+ val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 81e15c534f..6cc7fd2ff4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -18,12 +18,12 @@
package org.apache.spark.deploy.master
import scala.collection.JavaConversions._
-import scala.concurrent.ops._
-import org.apache.spark.Logging
import org.apache.zookeeper._
-import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.apache.zookeeper.data.Stat
+
+import org.apache.spark.Logging
/**
* Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
@@ -33,7 +33,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
* informed via zkDown().
*
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
- * times or a semantic exception is thrown (e.g.., "node already exists").
+ * times or a semantic exception is thrown (e.g., "node already exists").
*/
private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
@@ -103,6 +103,7 @@ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) ext
connectToZooKeeper()
case KeeperState.Disconnected =>
logWarning("ZooKeeper disconnected, will retry...")
+ case s => // Do nothing
}
}
}
@@ -179,7 +180,7 @@ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) ext
} catch {
case e: KeeperException.NoNodeException => throw e
case e: KeeperException.NodeExistsException => throw e
- case e if n > 0 =>
+ case e: Exception if n > 0 =>
logError("ZooKeeper exception, " + n + " more retries...", e)
Thread.sleep(RETRY_WAIT_MILLIS)
retry(fn, n-1)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 7809013e83..7d535b08de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -21,8 +21,8 @@ import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
-import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.Logging
+import org.apache.spark.deploy.master.MasterMessages._
private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
@@ -105,7 +105,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, mas
// We found a different master file pointing to this process.
// This can happen in the following two cases:
// (1) The master process was restarted on the same node.
- // (2) The ZK server died between creating the node and returning the name of the node.
+ // (2) The ZK server died between creating the file and returning the name of the file.
// For this case, we will end up creating a second file, and MUST explicitly delete the
// first one, since our ZK session is still open.
// Note that this deletion will cause a NodeDeleted event to be fired so we check again for
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 3b983c19eb..dbb0cb90f5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,32 +17,28 @@
package org.apache.spark.deploy.master.ui
+import scala.concurrent.Await
import scala.xml.Node
import akka.pattern.ask
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import javax.servlet.http.HttpServletRequest
-
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
- implicit val timeout = parent.timeout
+ val timeout = parent.timeout
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
@@ -53,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 65e7a14e7a..4ef762892c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -17,37 +17,33 @@
package org.apache.spark.deploy.master.ui
-import javax.servlet.http.HttpServletRequest
-
+import scala.concurrent.Await
import scala.xml.Node
-import scala.concurrent.Await
import akka.pattern.ask
-import scala.concurrent.duration._
-
+import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.deploy.DeployWebUI
+import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
- implicit val timeout = parent.timeout
+ val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
val workers = state.workers.sortBy(_.id)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a211ce2b42..9ab594b682 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,25 +17,21 @@
package org.apache.spark.deploy.master.ui
-import scala.concurrent.duration._
-
import javax.servlet.http.HttpServletRequest
-
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 1a768d501f..0d59048313 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -42,13 +42,13 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
- val workerState = Await.result(stateFuture, 30 seconds)
+ val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
- val workerState = Await.result(stateFuture, 30 seconds)
+ val workerState = Await.result(stateFuture, timeout)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
val runningExecutorTable =
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 6c18a3c245..40d6bdb3fd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -19,17 +19,14 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
-import scala.concurrent.duration._
-
-import akka.util.Timeout
import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
-import org.eclipse.jetty.server.{Handler, Server}
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone worker.
@@ -37,8 +34,7 @@ import org.eclipse.jetty.server.{Handler, Server}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
- implicit val timeout = Timeout(
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+ val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0b0a60ee60..0f19d7a96b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -222,18 +222,22 @@ private[spark] class Executor(
return
}
+ val resultSer = SparkEnv.get.serializer.newInstance()
+ val beforeSerialization = System.currentTimeMillis()
+ val valueBytes = resultSer.serialize(value)
+ val afterSerialization = System.currentTimeMillis()
+
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
+ m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
}
- // TODO I'd also like to track the time it takes to serialize the task results, but that is
- // huge headache, b/c we need to serialize the task metrics first. If TaskMetrics had a
- // custom serialized format, we could just change the relevants bytes in the byte buffer
+
val accumUpdates = Accumulators.values
- val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null))
+ val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index c0ce46e379..bb1471d9ee 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -44,6 +44,11 @@ class TaskMetrics extends Serializable {
var jvmGCTime: Long = _
/**
+ * Amount of time spent serializing the task result
+ */
+ var resultSerializationTime: Long = _
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3841b5616d..ee63b3c4a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -63,7 +63,7 @@ trait SparkListener {
* Called when a task begins remotely fetching its result (will not be called for tasks that do
* not need to fetch the result remotely).
*/
- def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
+ def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
/**
* Called when a task ends
@@ -131,8 +131,8 @@ object StatsReportListener extends Logging {
def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
val stats = d.statCounter
- logInfo(heading + stats)
val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+ logInfo(heading + stats)
logInfo(percentilesHeader)
logInfo("\t" + quantiles.mkString("\t"))
}
@@ -173,8 +173,6 @@ object StatsReportListener extends Logging {
showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
}
-
-
val seconds = 1000L
val minutes = seconds * 60
val hours = minutes * 60
@@ -198,7 +196,6 @@ object StatsReportListener extends Logging {
}
-
case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index d5824e7954..85687ea330 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -91,4 +91,3 @@ private[spark] class SparkListenerBus() extends Logging {
return true
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 7e468d0d67..e80cc6b0f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
-class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
- def this() = this(null.asInstanceOf[T], null, null)
+ def this() = this(null.asInstanceOf[ByteBuffer], null, null)
override def writeExternal(out: ObjectOutput) {
- val objectSer = SparkEnv.get.serializer.newInstance()
- val bb = objectSer.serialize(value)
-
- out.writeInt(bb.remaining())
- Utils.writeByteBuffer(bb, out)
+ out.writeInt(valueBytes.remaining);
+ Utils.writeByteBuffer(valueBytes, out)
out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) {
@@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
override def readExternal(in: ObjectInput) {
- val objectSer = SparkEnv.get.serializer.newInstance()
-
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
in.readFully(byteVal)
- value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+ valueBytes = ByteBuffer.wrap(byteVal)
val numUpdates = in.readInt
if (numUpdates == 0) {
@@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
}
metrics = in.readObject().asInstanceOf[TaskMetrics]
}
+
+ def value(): T = {
+ val resultSer = SparkEnv.get.serializer.newInstance()
+ return resultSer.deserialize(valueBytes)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f5e8766f6d..7e22c843bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{SparkException, Logging, TaskState}
+import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -47,6 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
+ private val timeout = AkkaUtils.askTimeout
+
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
@@ -172,10 +174,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
- private val timeout = {
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
- }
-
def stopExecutors() {
try {
if (driverActor != null) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e05b842476..e1d68ef592 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -18,7 +18,6 @@
package org.apache.spark.storage
import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
@@ -26,15 +25,17 @@ import akka.pattern.ask
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.AkkaUtils
-private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
+private[spark]
+class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 154a3980e9..21022e1cfb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -21,17 +21,15 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConversions._
+import scala.concurrent.Future
+import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
-import scala.concurrent.duration._
-import scala.concurrent.Future
-
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.Utils
-
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -50,8 +48,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- val akkaTimeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ private val akkaTimeout = AkkaUtils.askTimeout
initLogging()
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index e596690bc3..a31a7e1d58 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
+ "Shuffle Write")
def execRow(kv: Seq[String]) = {
<tr>
@@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
+ <td>{Utils.msDurationToString(kv(10).toLong)}</td>
+ <td>{Utils.bytesToString(kv(11).toLong)}</td>
+ <td>{Utils.bytesToString(kv(12).toLong)}</td>
</tr>
}
@@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
+ val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+ val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
Seq(
execId,
@@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
- totalTasks.toString
+ totalTasks.toString,
+ totalDuration.toString,
+ totalShuffleRead.toString,
+ totalShuffleWrite.toString
)
}
@@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
+ val executorToDuration = HashMap[String, Long]()
+ val executorToShuffleRead = HashMap[String, Long]()
+ val executorToShuffleWrite = HashMap[String, Long]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
@@ -140,6 +153,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
+ executorToDuration.put(eid, newDuration)
+
activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
@@ -150,6 +166,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
+
+ // update shuffle read/write
+ if (null != taskEnd.taskMetrics) {
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
+ executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
+ shuffleRead.remoteBytesRead))
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
+ executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
+ shuffleWrite.shuffleBytesWritten))
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
new file mode 100644
index 0000000000..3c53e88380
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+/** class for reporting aggregated metrics for each executors in stageUI */
+private[spark] class ExecutorSummary {
+ var taskTime : Long = 0
+ var failedTasks : Int = 0
+ var succeededTasks : Int = 0
+ var shuffleRead : Long = 0
+ var shuffleWrite : Long = 0
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
new file mode 100644
index 0000000000..0dd876480a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import scala.xml.Node
+
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.util.Utils
+import scala.collection.mutable
+
+/** Page showing executor summary */
+private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
+
+ val listener = parent.listener
+ val dateFmt = parent.dateFmt
+ val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
+
+ def toNodeSeq(): Seq[Node] = {
+ listener.synchronized {
+ executorTable()
+ }
+ }
+
+ /** Special table which merges two header cells. */
+ private def executorTable[T](): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Executor ID</th>
+ <th>Address</th>
+ <th>Task Time</th>
+ <th>Total Tasks</th>
+ <th>Failed Tasks</th>
+ <th>Succeeded Tasks</th>
+ <th>Shuffle Read</th>
+ <th>Shuffle Write</th>
+ </thead>
+ <tbody>
+ {createExecutorTable()}
+ </tbody>
+ </table>
+ }
+
+ private def createExecutorTable() : Seq[Node] = {
+ // make a executor-id -> address map
+ val executorIdToAddress = mutable.HashMap[String, String]()
+ val storageStatusList = parent.sc.getExecutorStorageStatus
+ for (statusId <- 0 until storageStatusList.size) {
+ val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId
+ val address = blockManagerId.hostPort
+ val executorId = blockManagerId.executorId
+ executorIdToAddress.put(executorId, address)
+ }
+
+ val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
+ executorIdToSummary match {
+ case Some(x) => {
+ x.toSeq.sortBy(_._1).map{
+ case (k,v) => {
+ <tr>
+ <td>{k}</td>
+ <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
+ <td>{parent.formatDuration(v.taskTime)}</td>
+ <td>{v.failedTasks + v.succeededTasks}</td>
+ <td>{v.failedTasks}</td>
+ <td>{v.succeededTasks}</td>
+ <td>{Utils.bytesToString(v.shuffleRead)}</td>
+ <td>{Utils.bytesToString(v.shuffleWrite)}</td>
+ </tr>
+ }
+ }
+ }
+ case _ => { Seq[Node]() }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 6b854740d6..07a42f0503 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskInfos =
HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -124,8 +125,41 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.task.stageId
+
+ // create executor summary map if necessary
+ val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
+ op = new HashMap[String, ExecutorSummary]())
+ executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
+ op = new ExecutorSummary())
+
+ val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId)
+ executorSummary match {
+ case Some(y) => {
+ // first update failed-task, succeed-task
+ taskEnd.reason match {
+ case Success =>
+ y.succeededTasks += 1
+ case _ =>
+ y.failedTasks += 1
+ }
+
+ // update duration
+ y.taskTime += taskEnd.taskInfo.duration
+
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
+ y.shuffleRead += shuffleRead.remoteBytesRead
+ }
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+ y.shuffleWrite += shuffleWrite.shuffleBytesWritten
+ }
+ }
+ case _ => {}
+ }
+
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
+
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 69f9446bab..8dcfeacb60 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<div>
<ul class="unstyled">
<li>
- <strong>Total duration across all tasks: </strong>
+ <strong>Total task time across all tasks: </strong>
{parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
@@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
- Seq("Duration", "GC Time") ++
+ Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
Seq("Errors")
@@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
None
}
else {
+ val serializationTimes = validTasks.map{case (info, metrics, exception) =>
+ metrics.get.resultSerializationTime.toDouble}
+ val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
+ ms => parent.formatDuration(ms.toLong))
+
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
@@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
val listings: Seq[Seq[String]] = Seq(
+ serializationQuantiles,
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
@@ -160,11 +166,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
-
+ val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
+ <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
@@ -183,6 +190,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
+ val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)
val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
@@ -210,6 +218,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) parent.formatDuration(gcTime) else ""}
</td>
+ <td sorttable_customkey={serializationTime.toString}>
+ {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+ </td>
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9ad6de3c6d..463d85dfd5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
- <th>Duration</th>
+ <th>Task Time</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index a5446b3fc3..39f422dd6b 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -28,9 +28,6 @@ import org.apache.spark.ui.JettyUtils._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
val indexPage = new IndexPage(this)
val rddPage = new RDDPage(this)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 74133cef6c..1c8b51b8bc 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import scala.concurrent.duration.{Duration, FiniteDuration}
+
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
@@ -84,4 +86,8 @@ private[spark] object AkkaUtils {
(actorSystem, boundPort)
}
+ /** Returns the default Spark timeout to use for Akka ask operations. */
+ def askTimeout: FiniteDuration = {
+ Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 4234f6eac7..79913dc718 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -897,4 +897,37 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
}
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+
+ List[] parts = rdd1.collectPartitions(new int[] {0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(2, 0)),
+ rdd2.collectPartitions(new int[] {0})[0]);
+
+ parts = rdd2.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+ new Tuple2<Integer, Integer>(4, 0)),
+ parts[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+ new Tuple2<Integer, Integer>(6, 0),
+ new Tuple2<Integer, Integer>(7, 1)),
+ parts[1]);
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 29c4cc5d9c..bb28a31a99 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -313,6 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
}
def createTaskResult(id: Int): DirectTaskResult[Int] = {
- new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
+ val valueSer = SparkEnv.get.serializer.newInstance()
+ new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
new file mode 100644
index 0000000000..67a57a0e7f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.scalatest.FunSuite
+import org.apache.spark.scheduler._
+import org.apache.spark.{LocalSparkContext, SparkContext, Success}
+import org.apache.spark.scheduler.SparkListenerTaskStart
+import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
+
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
+ test("test executor id to summary") {
+ val sc = new SparkContext("local", "test")
+ val listener = new JobProgressListener(sc)
+ val taskMetrics = new TaskMetrics()
+ val shuffleReadMetrics = new ShuffleReadMetrics()
+
+ // nothing in it
+ assert(listener.stageIdToExecutorSummaries.size == 0)
+
+ // finish this task, should get updated shuffleRead
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+ .shuffleRead == 1000)
+
+ // finish a task with unknown executor-id, nothing should happen
+ taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.size == 1)
+
+ // finish this task, should get updated duration
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+ .shuffleRead == 2000)
+
+ // finish this task, should get updated duration
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
+ .shuffleRead == 1000)
+ }
+}
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 94e8563a8b..56d2a3a4a0 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -363,7 +363,7 @@ res2: Int = 10
# Where to Go from Here
-You can see some [example Spark programs](http://www.spark-project.org/examples.html) on the Spark website.
+You can see some [example Spark programs](http://spark.incubator.apache.org/examples.html) on the Spark website.
In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run-example` script included in Spark; for example:
./run-example org.apache.spark.examples.SparkPi
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 29f4a4b9ff..7bcbd90bd3 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -114,6 +114,9 @@ object SparkBuild extends Build {
fork := true,
javaOptions += "-Xmx3g",
+ // Show full stack trace and duration in test cases.
+ testOptions in Test += Tests.Argument("-oDF"),
+
// Only allow one test at a time, even across projects, since they run in the same JVM
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
@@ -177,6 +180,8 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.0.CR1",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
+ /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
+ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.9" % "test",
@@ -257,7 +262,7 @@ object SparkBuild extends Build {
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v )
)
-
+
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
libraryDependencies ++= Seq(
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index cbd41e58c4..0604f6836c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -43,7 +43,6 @@ class SparkContext(object):
_gateway = None
_jvm = None
_writeToFile = None
- _takePartition = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
@@ -134,8 +133,6 @@ class SparkContext(object):
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = \
SparkContext._jvm.PythonRDD.writeToFile
- SparkContext._takePartition = \
- SparkContext._jvm.PythonRDD.takePartition
if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 61720dcf1a..f87923e6fa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -54,6 +54,9 @@ class RDD(object):
self.ctx = ctx
self._jrdd_deserializer = jrdd_deserializer
+ def __repr__(self):
+ return self._jrdd.toString()
+
@property
def context(self):
"""
@@ -576,8 +579,13 @@ class RDD(object):
# Take only up to num elements from each partition we try
mapped = self.mapPartitions(takeUpToNum)
items = []
+ # TODO(shivaram): Similar to the scala implementation, update the take
+ # method to scan multiple splits based on an estimate of how many elements
+ # we have per-split.
for partition in range(mapped._jrdd.splits().size()):
- iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
+ partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
+ partitionsToTake[0] = partition
+ iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
items.extend(mapped._collect_iterator_through_file(iterator))
if len(items) >= num:
break
diff --git a/run-example b/run-example
index feade6589a..a78192d31d 100755
--- a/run-example
+++ b/run-example
@@ -17,6 +17,11 @@
# limitations under the License.
#
+cygwin=false
+case "`uname`" in
+ CYGWIN*) cygwin=true;;
+esac
+
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
@@ -59,6 +64,11 @@ fi
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
+if $cygwin; then
+ CLASSPATH=`cygpath -wp $CLASSPATH`
+ export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
+fi
+
# Find java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
diff --git a/sbt/sbt b/sbt/sbt
index c31a0280ff..5942280585 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -17,12 +17,27 @@
# limitations under the License.
#
-EXTRA_ARGS=""
+cygwin=false
+case "`uname`" in
+ CYGWIN*) cygwin=true;;
+esac
+
+EXTRA_ARGS="-Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m"
if [ "$MESOS_HOME" != "" ]; then
- EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java"
+ EXTRA_ARGS="$EXTRA_ARGS -Djava.library.path=$MESOS_HOME/lib/java"
fi
export SPARK_HOME=$(cd "$(dirname $0)/.." 2>&1 >/dev/null ; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
-java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m $EXTRA_ARGS $SBT_OPTS -jar "$SPARK_HOME"/sbt/sbt-launch-*.jar "$@"
+SBT_JAR="$SPARK_HOME"/sbt/sbt-launch-*.jar
+if $cygwin; then
+ SBT_JAR=`cygpath -w $SBT_JAR`
+ export SPARK_HOME=`cygpath -w $SPARK_HOME`
+ EXTRA_ARGS="$EXTRA_ARGS -Djline.terminal=jline.UnixTerminal -Dsbt.cygwin=true"
+ stty -icanon min 1 -echo > /dev/null 2>&1
+ java $EXTRA_ARGS $SBT_OPTS -jar $SBT_JAR "$@"
+ stty icanon echo > /dev/null 2>&1
+else
+ java $EXTRA_ARGS $SBT_OPTS -jar $SBT_JAR "$@"
+fi \ No newline at end of file
diff --git a/spark-class b/spark-class
index ff51fbd557..802e4aa104 100755
--- a/spark-class
+++ b/spark-class
@@ -17,6 +17,11 @@
# limitations under the License.
#
+cygwin=false
+case "`uname`" in
+ CYGWIN*) cygwin=true;;
+esac
+
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
@@ -125,6 +130,11 @@ fi
# Compute classpath using external script
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
+
+if $cygwin; then
+ CLASSPATH=`cygpath -wp $CLASSPATH`
+ export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR`
+fi
export CLASSPATH
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
diff --git a/spark-shell b/spark-shell
index 9608bd3f30..d20af0fb39 100755
--- a/spark-shell
+++ b/spark-shell
@@ -23,7 +23,11 @@
# if those two env vars are set in spark-env.sh but MASTER is not.
# Options:
# -c <cores> Set the number of cores for REPL to use
-#
+
+cygwin=false
+case "`uname`" in
+ CYGWIN*) cygwin=true;;
+esac
# Enter posix mode for bash
set -o posix
@@ -79,7 +83,18 @@ if [[ ! $? ]]; then
saved_stty=""
fi
-$FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
+if $cygwin; then
+ # Workaround for issue involving JLine and Cygwin
+ # (see http://sourceforge.net/p/jline/bugs/40/).
+ # If you're using the Mintty terminal emulator in Cygwin, may need to set the
+ # "Backspace sends ^H" setting in "Keys" section of the Mintty options
+ # (see https://github.com/sbt/sbt/issues/562).
+ stty -icanon min 1 -echo > /dev/null 2>&1
+ $FWDIR/spark-class -Djline.terminal=unix $OPTIONS org.apache.spark.repl.Main "$@"
+ stty icanon echo > /dev/null 2>&1
+else
+ $FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
+fi
# record the exit status lest it be overwritten:
# then reenable echo and propagate the code.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9271914eb5..7b343d2376 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -40,7 +40,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
- val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
+ val pendingTimes = ssc.scheduler.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds
def validate() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 329d2b5835..a78d3965ee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -17,24 +17,19 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.dstream._
import StreamingContext._
-import org.apache.spark.util.MetadataCleaner
-
-//import Time._
-
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
-import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.conf.Configuration
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index b9a58fded6..daed7ff7c3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -21,6 +21,7 @@ import dstream.InputDStream
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import collection.mutable.ArrayBuffer
import org.apache.spark.Logging
+import org.apache.spark.streaming.scheduler.Job
final private[streaming] class DStreamGraph extends Serializable with Logging {
initLogging()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
deleted file mode 100644
index 5233129506..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import org.apache.spark.Logging
-import org.apache.spark.SparkEnv
-import java.util.concurrent.Executors
-import collection.mutable.HashMap
-import collection.mutable.ArrayBuffer
-
-
-private[streaming]
-class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
-
- class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
- def run() {
- SparkEnv.set(ssc.env)
- try {
- val timeTaken = job.run()
- logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format(
- (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0))
- } catch {
- case e: Exception =>
- logError("Running " + job + " failed", e)
- }
- clearJob(job)
- }
- }
-
- initLogging()
-
- val jobExecutor = Executors.newFixedThreadPool(numThreads)
- val jobs = new HashMap[Time, ArrayBuffer[Job]]
-
- def runJob(job: Job) {
- jobs.synchronized {
- jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job
- }
- jobExecutor.execute(new JobHandler(ssc, job))
- logInfo("Added " + job + " to queue")
- }
-
- def stop() {
- jobExecutor.shutdown()
- }
-
- private def clearJob(job: Job) {
- var timeCleared = false
- val time = job.time
- jobs.synchronized {
- val jobsOfTime = jobs.get(time)
- if (jobsOfTime.isDefined) {
- jobsOfTime.get -= job
- if (jobsOfTime.get.isEmpty) {
- jobs -= time
- timeCleared = true
- }
- } else {
- throw new Exception("Job finished for time " + job.time +
- " but time does not exist in jobs")
- }
- }
- if (timeCleared) {
- ssc.scheduler.clearOldMetadata(time)
- }
- }
-
- def getPendingTimes(): Array[Time] = {
- jobs.synchronized {
- jobs.keySet.toArray
- }
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index d2c4fdee65..41da028a3c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -47,9 +47,9 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.Authorization
+import org.apache.spark.streaming.scheduler._
import akka.util.ByteString
-
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
* information (such as, cluster URL and job name) to internally create a SparkContext, it provides
@@ -148,9 +148,10 @@ class StreamingContext private (
}
}
- protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null
- protected[streaming] var receiverJobThread: Thread = null
- protected[streaming] var scheduler: Scheduler = null
+ protected[streaming] val checkpointDuration: Duration = {
+ if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
+ }
+ protected[streaming] val scheduler = new JobScheduler(this)
/**
* Return the associated Spark context
@@ -512,6 +513,13 @@ class StreamingContext private (
graph.addOutputStream(outputStream)
}
+ /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+ * receiving system events related to streaming.
+ */
+ def addStreamingListener(streamingListener: StreamingListener) {
+ scheduler.listenerBus.addListener(streamingListener)
+ }
+
protected def validate() {
assert(graph != null, "Graph is null")
graph.validate()
@@ -527,27 +535,22 @@ class StreamingContext private (
* Start the execution of the streams.
*/
def start() {
- if (checkpointDir != null && checkpointDuration == null && graph != null) {
- checkpointDuration = graph.batchDuration
- }
-
validate()
+ // Get the network input streams
val networkInputStreams = graph.getInputStreams().filter(s => s match {
case n: NetworkInputDStream[_] => true
case _ => false
}).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
+ // Start the network input tracker (must start before receivers)
if (networkInputStreams.length > 0) {
- // Start the network input tracker (must start before receivers)
networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
networkInputTracker.start()
}
-
Thread.sleep(1000)
// Start the scheduler
- scheduler = new Scheduler(this)
scheduler.start()
}
@@ -558,7 +561,6 @@ class StreamingContext private (
try {
if (scheduler != null) scheduler.stop()
if (networkInputTracker != null) networkInputTracker.stop()
- if (receiverJobThread != null) receiverJobThread.interrupt()
sc.stop()
logInfo("StreamingContext stopped successfully")
} catch {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 80dcf87491..78d318cf27 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.StreamingListener
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -687,6 +688,13 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.remember(duration)
}
+ /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+ * receiving system events related to streaming.
+ */
+ def addStreamingListener(streamingListener: StreamingListener) {
+ ssc.addStreamingListener(streamingListener)
+ }
+
/**
* Starts the execution of the streams.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 98b14cb224..364abcde68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -18,7 +18,8 @@
package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Job, Time}
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.scheduler.Job
import scala.reflect.ClassTag
private[streaming]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d5ae8aef92..5add20871e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.streaming._
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.rdd.{RDD, BlockRDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
/**
* Abstract class for defining any InputDStream that has to start a receiver on worker
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
new file mode 100644
index 0000000000..4e8d07fe92
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.streaming.Time
+
+/**
+ * Class having information on completed batches.
+ * @param batchTime Time of the batch
+ * @param submissionTime Clock time of when jobs of this batch was submitted to
+ * the streaming scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch started processing
+ * @param processingEndTime Clock time of when the last job of this batch finished processing
+ */
+case class BatchInfo(
+ batchTime: Time,
+ submissionTime: Long,
+ processingStartTime: Option[Long],
+ processingEndTime: Option[Long]
+ ) {
+
+ /**
+ * Time taken for the first job of this batch to start processing from the time this batch
+ * was submitted to the streaming scheduler. Essentially, it is
+ * `processingStartTime` - `submissionTime`.
+ */
+ def schedulingDelay = processingStartTime.map(_ - submissionTime)
+
+ /**
+ * Time taken for the all jobs of this batch to finish processing from the time they started
+ * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+ */
+ def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
+
+ /**
+ * Time taken for all the jobs of this batch to finish processing from the time they
+ * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
+ */
+ def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 2128b7c7a6..7341bfbc99 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -15,13 +15,17 @@
* limitations under the License.
*/
-package org.apache.spark.streaming
+package org.apache.spark.streaming.scheduler
-import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.streaming.Time
+/**
+ * Class representing a Spark computation. It may contain multiple Spark jobs.
+ */
private[streaming]
class Job(val time: Time, func: () => _) {
- val id = Job.getNewId()
+ var id: String = _
+
def run(): Long = {
val startTime = System.currentTimeMillis
func()
@@ -29,13 +33,9 @@ class Job(val time: Time, func: () => _) {
(stopTime - startTime)
}
- override def toString = "streaming job " + id + " @ " + time
-}
-
-private[streaming]
-object Job {
- val id = new AtomicLong(0)
-
- def getNewId() = id.getAndIncrement()
-}
+ def setId(number: Int) {
+ id = "streaming job " + time + "." + number
+ }
+ override def toString = id
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index ed892e33e6..1cd0b9b0a4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -15,31 +15,34 @@
* limitations under the License.
*/
-package org.apache.spark.streaming
+package org.apache.spark.streaming.scheduler
-import util.{ManualClock, RecurringTimer, Clock}
import org.apache.spark.SparkEnv
import org.apache.spark.Logging
+import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
+import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
+/**
+ * This class generates jobs from DStreams as well as drives checkpointing and cleaning
+ * up DStream metadata.
+ */
private[streaming]
-class Scheduler(ssc: StreamingContext) extends Logging {
+class JobGenerator(jobScheduler: JobScheduler) extends Logging {
initLogging()
-
- val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
- val jobManager = new JobManager(ssc, concurrentJobs)
- val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
- new CheckpointWriter(ssc.checkpointDir)
- } else {
- null
- }
-
+ val ssc = jobScheduler.ssc
val clockClass = System.getProperty(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateJobs(new Time(longTime)))
val graph = ssc.graph
+ lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
+ new CheckpointWriter(ssc.checkpointDir)
+ } else {
+ null
+ }
+
var latestTime: Time = null
def start() = synchronized {
@@ -48,26 +51,24 @@ class Scheduler(ssc: StreamingContext) extends Logging {
} else {
startFirstTime()
}
- logInfo("Scheduler started")
+ logInfo("JobGenerator started")
}
def stop() = synchronized {
timer.stop()
- jobManager.stop()
if (checkpointWriter != null) checkpointWriter.stop()
ssc.graph.stop()
- logInfo("Scheduler stopped")
+ logInfo("JobGenerator stopped")
}
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
- logInfo("Scheduler's timer started at " + startTime)
+ logInfo("JobGenerator's timer started at " + startTime)
}
private def restart() {
-
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
@@ -93,35 +94,34 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
- graph.generateJobs(time).foreach(jobManager.runJob)
+ jobScheduler.runJobs(time, graph.generateJobs(time))
)
// Restart the timer
timer.start(restartTime.milliseconds)
- logInfo("Scheduler's timer restarted at " + restartTime)
+ logInfo("JobGenerator's timer restarted at " + restartTime)
}
/** Generate jobs and perform checkpoint for the given `time`. */
- def generateJobs(time: Time) {
+ private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
- graph.generateJobs(time).foreach(jobManager.runJob)
+ jobScheduler.runJobs(time, graph.generateJobs(time))
latestTime = time
doCheckpoint(time)
}
/**
- * Clear old metadata assuming jobs of `time` have finished processing.
- * And also perform checkpoint.
+ * On batch completion, clear old metadata and checkpoint computation.
*/
- def clearOldMetadata(time: Time) {
+ private[streaming] def onBatchCompletion(time: Time) {
ssc.graph.clearOldMetadata(time)
doCheckpoint(time)
}
/** Perform checkpoint for the give `time`. */
- def doCheckpoint(time: Time) = synchronized {
- if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
+ private def doCheckpoint(time: Time) = synchronized {
+ if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
new file mode 100644
index 0000000000..9511ccfbed
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkEnv
+import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
+import scala.collection.mutable.HashSet
+import org.apache.spark.streaming._
+
+/**
+ * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
+ * the jobs and runs them using a thread pool. Number of threads
+ */
+private[streaming]
+class JobScheduler(val ssc: StreamingContext) extends Logging {
+
+ initLogging()
+
+ val jobSets = new ConcurrentHashMap[Time, JobSet]
+ val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+ val executor = Executors.newFixedThreadPool(numConcurrentJobs)
+ val generator = new JobGenerator(this)
+ val listenerBus = new StreamingListenerBus()
+
+ def clock = generator.clock
+
+ def start() {
+ generator.start()
+ }
+
+ def stop() {
+ generator.stop()
+ executor.shutdown()
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow()
+ }
+ }
+
+ def runJobs(time: Time, jobs: Seq[Job]) {
+ if (jobs.isEmpty) {
+ logInfo("No jobs added for time " + time)
+ } else {
+ val jobSet = new JobSet(time, jobs)
+ jobSets.put(time, jobSet)
+ jobSet.jobs.foreach(job => executor.execute(new JobHandler(job)))
+ logInfo("Added jobs for time " + time)
+ }
+ }
+
+ def getPendingTimes(): Array[Time] = {
+ jobSets.keySet.toArray(new Array[Time](0))
+ }
+
+ private def beforeJobStart(job: Job) {
+ val jobSet = jobSets.get(job.time)
+ if (!jobSet.hasStarted) {
+ listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo()))
+ }
+ jobSet.beforeJobStart(job)
+ logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
+ SparkEnv.set(generator.ssc.env)
+ }
+
+ private def afterJobEnd(job: Job) {
+ val jobSet = jobSets.get(job.time)
+ jobSet.afterJobStop(job)
+ logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
+ if (jobSet.hasCompleted) {
+ jobSets.remove(jobSet.time)
+ generator.onBatchCompletion(jobSet.time)
+ logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
+ jobSet.totalDelay / 1000.0, jobSet.time.toString,
+ jobSet.processingDelay / 1000.0
+ ))
+ listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
+ }
+ }
+
+ private[streaming]
+ class JobHandler(job: Job) extends Runnable {
+ def run() {
+ beforeJobStart(job)
+ try {
+ job.run()
+ } catch {
+ case e: Exception =>
+ logError("Running " + job + " failed", e)
+ }
+ afterJobEnd(job)
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
new file mode 100644
index 0000000000..57268674ea
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import scala.collection.mutable.HashSet
+import org.apache.spark.streaming.Time
+
+/** Class representing a set of Jobs
+ * belong to the same batch.
+ */
+private[streaming]
+case class JobSet(time: Time, jobs: Seq[Job]) {
+
+ private val incompleteJobs = new HashSet[Job]()
+ var submissionTime = System.currentTimeMillis() // when this jobset was submitted
+ var processingStartTime = -1L // when the first job of this jobset started processing
+ var processingEndTime = -1L // when the last job of this jobset finished processing
+
+ jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
+ incompleteJobs ++= jobs
+
+ def beforeJobStart(job: Job) {
+ if (processingStartTime < 0) processingStartTime = System.currentTimeMillis()
+ }
+
+ def afterJobStop(job: Job) {
+ incompleteJobs -= job
+ if (hasCompleted) processingEndTime = System.currentTimeMillis()
+ }
+
+ def hasStarted() = (processingStartTime > 0)
+
+ def hasCompleted() = incompleteJobs.isEmpty
+
+ // Time taken to process all the jobs from the time they started processing
+ // (i.e. not including the time they wait in the streaming scheduler queue)
+ def processingDelay = processingEndTime - processingStartTime
+
+ // Time taken to process all the jobs from the time they were submitted
+ // (i.e. including the time they wait in the streaming scheduler queue)
+ def totalDelay = {
+ processingEndTime - time.milliseconds
+ }
+
+ def toBatchInfo(): BatchInfo = {
+ new BatchInfo(
+ time,
+ submissionTime,
+ if (processingStartTime >= 0 ) Some(processingStartTime) else None,
+ if (processingEndTime >= 0 ) Some(processingEndTime) else None
+ )
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 6e9a781978..abff55d77c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming
+package org.apache.spark.streaming.scheduler
import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
@@ -31,6 +31,7 @@ import akka.actor._
import akka.pattern.ask
import akka.dispatch._
import org.apache.spark.storage.BlockId
+import org.apache.spark.streaming.{Time, StreamingContext}
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
new file mode 100644
index 0000000000..36225e190c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import scala.collection.mutable.Queue
+import org.apache.spark.util.Distribution
+
+/** Base trait for events related to StreamingListener */
+sealed trait StreamingListenerEvent
+
+case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
+case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
+
+/**
+ * A listener interface for receiving information about an ongoing streaming
+ * computation.
+ */
+trait StreamingListener {
+ /**
+ * Called when processing of a batch has completed
+ */
+ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
+
+ /**
+ * Called when processing of a batch has started
+ */
+ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
+}
+
+
+/**
+ * A simple StreamingListener that logs summary statistics across Spark Streaming batches
+ * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
+ */
+class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
+ // Queue containing latest completed batches
+ val batchInfos = new Queue[BatchInfo]()
+
+ override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
+ batchInfos.enqueue(batchStarted.batchInfo)
+ if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
+ printStats()
+ }
+
+ def printStats() {
+ showMillisDistribution("Total delay: ", _.totalDelay)
+ showMillisDistribution("Processing time: ", _.processingDelay)
+ }
+
+ def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
+ org.apache.spark.scheduler.StatsReportListener.showMillisDistribution(
+ heading, extractDistribution(getMetric))
+ }
+
+ def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+ Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
new file mode 100644
index 0000000000..110a20f282
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.Logging
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import java.util.concurrent.LinkedBlockingQueue
+
+/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
+private[spark] class StreamingListenerBus() extends Logging {
+ private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener]
+
+ /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+ * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+ private val EVENT_QUEUE_CAPACITY = 10000
+ private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
+ private var queueFullErrorMessageLogged = false
+
+ new Thread("StreamingListenerBus") {
+ setDaemon(true)
+ override def run() {
+ while (true) {
+ val event = eventQueue.take
+ event match {
+ case batchStarted: StreamingListenerBatchStarted =>
+ listeners.foreach(_.onBatchStarted(batchStarted))
+ case batchCompleted: StreamingListenerBatchCompleted =>
+ listeners.foreach(_.onBatchCompleted(batchCompleted))
+ case _ =>
+ }
+ }
+ }
+ }.start()
+
+ def addListener(listener: StreamingListener) {
+ listeners += listener
+ }
+
+ def post(event: StreamingListenerEvent) {
+ val eventAdded = eventQueue.offer(event)
+ if (!eventAdded && !queueFullErrorMessageLogged) {
+ logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+ "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+ "rate at which tasks are being started by the scheduler.")
+ queueFullErrorMessageLogged = true
+ }
+ }
+
+ /**
+ * Waits until there are no more events in the queue, or until the specified time has elapsed.
+ * Used for testing only. Returns true if the queue has emptied and false is the specified time
+ * elapsed before the queue emptied.
+ */
+ def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+ val finishTime = System.currentTimeMillis + timeoutMillis
+ while (!eventQueue.isEmpty()) {
+ if (System.currentTimeMillis > finishTime) {
+ return false
+ }
+ /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+ * add overhead in the general case. */
+ Thread.sleep(10)
+ }
+ return true
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 259ef1608c..b35ca00b53 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -26,18 +26,6 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
- override def framework() = "BasicOperationsSuite"
-
- before {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
- }
-
- after {
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
- }
-
test("map") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index e81287b44e..67a0841535 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -40,31 +40,25 @@ import org.apache.spark.streaming.util.ManualClock
* the checkpointing of a DStream's RDDs as well as the checkpointing of
* the whole DStream graph.
*/
-class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+class CheckpointSuite extends TestSuiteBase {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ var ssc: StreamingContext = null
+
+ override def batchDuration = Milliseconds(500)
+
+ override def actuallyWait = true // to allow checkpoints to be written
- before {
+ override def beforeFunction() {
+ super.beforeFunction()
FileUtils.deleteDirectory(new File(checkpointDir))
}
- after {
+ override def afterFunction() {
+ super.afterFunction()
if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
}
- var ssc: StreamingContext = null
-
- override def framework = "CheckpointSuite"
-
- override def batchDuration = Milliseconds(500)
-
- override def actuallyWait = true
-
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 6337c5359c..da9b04de1a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -32,17 +32,22 @@ import collection.mutable.ArrayBuffer
* This testsuite tests master failures at random times while the stream is running using
* the real clock.
*/
-class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
+class FailureSuite extends TestSuiteBase with Logging {
var directory = "FailureSuite"
val numBatches = 30
- val batchDuration = Milliseconds(1000)
- before {
+ override def batchDuration = Milliseconds(1000)
+
+ override def useManualClock = false
+
+ override def beforeFunction() {
+ super.beforeFunction()
FileUtils.deleteDirectory(new File(directory))
}
- after {
+ override def afterFunction() {
+ super.afterFunction()
FileUtils.deleteDirectory(new File(directory))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7dc82decef..62a9f120b4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -50,18 +50,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val testPort = 9999
- override def checkpointDir = "checkpoint"
-
- before {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
- }
-
- after {
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
- }
-
test("socket input stream") {
// Start the server
val testServer = new TestServer()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
new file mode 100644
index 0000000000..fa64142096
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.streaming.scheduler._
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.matchers.ShouldMatchers
+
+class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
+
+ val input = (1 to 4).map(Seq(_)).toSeq
+ val operation = (d: DStream[Int]) => d.map(x => x)
+
+ // To make sure that the processing start and end times in collected
+ // information are different for successive batches
+ override def batchDuration = Milliseconds(100)
+ override def actuallyWait = true
+
+ test("basic BatchInfo generation") {
+ val ssc = setupStreams(input, operation)
+ val collector = new BatchInfoCollector
+ ssc.addStreamingListener(collector)
+ runStreams(ssc, input.size, input.size)
+ val batchInfos = collector.batchInfos
+ batchInfos should have size 4
+
+ batchInfos.foreach(info => {
+ info.schedulingDelay should not be None
+ info.processingDelay should not be None
+ info.totalDelay should not be None
+ info.schedulingDelay.get should be >= 0L
+ info.processingDelay.get should be >= 0L
+ info.totalDelay.get should be >= 0L
+ })
+
+ isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
+ isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
+ isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
+ }
+
+ /** Check if a sequence of numbers is in increasing order */
+ def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
+ for(i <- 1 until seq.size) {
+ if (seq(i - 1) > seq(i)) return false
+ }
+ true
+ }
+
+ /** Listener that collects information on processed batches */
+ class BatchInfoCollector extends StreamingListener {
+ val batchInfos = new ArrayBuffer[BatchInfo]
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
+ batchInfos += batchCompleted.batchInfo
+ }
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 2f34e812a1..e969e91d13 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -110,7 +110,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Name of the framework for Spark context
- def framework = "TestSuiteBase"
+ def framework = this.getClass.getSimpleName
// Master for Spark context
def master = "local[2]"
@@ -127,9 +127,39 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Maximum time to wait before the test times out
def maxWaitTimeMillis = 10000
+ // Whether to use manual clock or not
+ def useManualClock = true
+
// Whether to actually wait in real time before changing manual clock
def actuallyWait = false
+ // Default before function for any streaming test suite. Override this
+ // if you want to add your stuff to "before" (i.e., don't call before { } )
+ def beforeFunction() {
+ if (useManualClock) {
+ System.setProperty(
+ "spark.streaming.clock",
+ "org.apache.spark.streaming.util.ManualClock"
+ )
+ } else {
+ System.clearProperty("spark.streaming.clock")
+ }
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ }
+
+ // Default after function for any streaming test suite. Override this
+ // if you want to add your stuff to "after" (i.e., don't call after { } )
+ def afterFunction() {
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ }
+
+ before(beforeFunction)
+ after(afterFunction)
+
/**
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index f50e05c0d8..6b4aaefcdf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -22,19 +22,9 @@ import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ override def maxWaitTimeMillis = 20000 // large window tests can sometimes take longer
- override def framework = "WindowOperationsSuite"
-
- override def maxWaitTimeMillis = 20000
-
- override def batchDuration = Seconds(1)
-
- after {
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
- }
+ override def batchDuration = Seconds(1) // making sure its visible in this class
val largerSlideInput = Seq(
Seq(("a", 1)),