aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xconf/streaming-env.sh.template2
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala12
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala27
-rw-r--r--core/src/main/scala/spark/RDD.scala30
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala1
-rw-r--r--core/src/main/scala/spark/Utils.scala8
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala24
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala6
-rw-r--r--core/src/main/scala/spark/deploy/WebUI.scala30
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala14
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala23
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala96
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterArguments.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerArguments.scala4
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala15
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala75
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala48
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala7
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala2
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala28
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala91
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala7
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala37
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala90
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashSet.scala66
-rw-r--r--core/src/main/scala/spark/util/Vector.scala3
-rw-r--r--core/src/main/twirl/spark/deploy/master/index.scala.html15
-rw-r--r--core/src/main/twirl/spark/deploy/master/job_row.scala.html18
-rw-r--r--core/src/main/twirl/spark/deploy/master/job_table.scala.html9
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_row.scala.html6
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_table.scala.html4
-rw-r--r--core/src/main/twirl/spark/deploy/worker/executor_row.scala.html8
-rw-r--r--core/src/main/twirl/spark/deploy/worker/index.scala.html6
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala26
-rw-r--r--docs/quick-start.md18
-rw-r--r--docs/running-on-mesos.md2
-rw-r--r--docs/scala-programming-guide.md2
-rwxr-xr-xec2/spark_ec2.py194
-rw-r--r--examples/src/main/scala/spark/examples/SparkKMeans.scala27
-rw-r--r--run2.cmd2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala45
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala70
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/CountRaw.scala32
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStream.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/QueueStream.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala5
-rw-r--r--streaming/src/test/resources/log4j.properties2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala12
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala6
69 files changed, 1024 insertions, 355 deletions
diff --git a/conf/streaming-env.sh.template b/conf/streaming-env.sh.template
index 6b4094c515..1ea9ba5541 100755
--- a/conf/streaming-env.sh.template
+++ b/conf/streaming-env.sh.template
@@ -11,7 +11,7 @@
SPARK_JAVA_OPTS+=" -XX:+UseConcMarkSweepGC"
-# Using of Kryo serialization can improve serialization performance
+# Using Kryo serialization can improve serialization performance
# and therefore the throughput of the Spark Streaming programs. However,
# using Kryo serialization with custom classes may required you to
# register the classes with Kryo. Refer to the Spark documentation
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index c5db6ce63a..cb54e12257 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -14,6 +14,7 @@ import scala.collection.mutable.HashSet
import spark.storage.BlockManager
import spark.storage.StorageLevel
+import util.{TimeStampedHashSet, MetadataCleaner, TimeStampedHashMap}
private[spark] sealed trait CacheTrackerMessage
@@ -30,7 +31,7 @@ private[spark] case object StopCacheTracker extends CacheTrackerMessage
private[spark] class CacheTrackerActor extends Actor with Logging {
// TODO: Should probably store (String, CacheType) tuples
- private val locs = new HashMap[Int, Array[List[String]]]
+ private val locs = new TimeStampedHashMap[Int, Array[List[String]]]
/**
* A map from the slave's host name to its cache size.
@@ -38,6 +39,8 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
private val slaveCapacity = new HashMap[String, Long]
private val slaveUsage = new HashMap[String, Long]
+ private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.cleanup)
+
private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L)
private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L)
private def getCacheAvailable(host: String): Long = getCacheCapacity(host) - getCacheUsage(host)
@@ -86,6 +89,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
case StopCacheTracker =>
logInfo("Stopping CacheTrackerActor")
sender ! true
+ metadataCleaner.cancel()
context.stop(self)
}
}
@@ -109,11 +113,15 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
actorSystem.actorFor(url)
}
- val registeredRddIds = new HashSet[Int]
+ // TODO: Consider removing this HashSet completely as locs CacheTrackerActor already
+ // keeps track of registered RDDs
+ val registeredRddIds = new TimeStampedHashSet[Int]
// Remembers which splits are currently being loaded (on worker nodes)
val loading = new HashSet[String]
+ val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.cleanup)
+
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
def askTracker(message: Any): Any = {
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 45441aa5e5..20ff5431af 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -17,6 +17,7 @@ import scala.collection.mutable.HashSet
import scheduler.MapStatus
import spark.storage.BlockManagerId
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+import util.{MetadataCleaner, TimeStampedHashMap}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
@@ -43,7 +44,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
val timeout = 10.seconds
- var mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]
+ var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
// Incremented every time a fetch fails so that client nodes know to clear
// their cache of map output locations if this happens.
@@ -52,7 +53,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// Cache a serialized version of the output statuses for each shuffle to send them out faster
var cacheGeneration = generation
- val cachedSerializedStatuses = new HashMap[Int, Array[Byte]]
+ val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
var trackerActor: ActorRef = if (isMaster) {
val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
@@ -63,6 +64,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
actorSystem.actorFor(url)
}
+ val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
+
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
def askTracker(message: Any): Any = {
@@ -83,14 +86,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
def registerShuffle(shuffleId: Int, numMaps: Int) {
- if (mapStatuses.get(shuffleId) != null) {
+ if (mapStatuses.get(shuffleId) != None) {
throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
}
mapStatuses.put(shuffleId, new Array[MapStatus](numMaps))
}
def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
- var array = mapStatuses.get(shuffleId)
+ var array = mapStatuses(shuffleId)
array.synchronized {
array(mapId) = status
}
@@ -107,7 +110,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) {
- var array = mapStatuses.get(shuffleId)
+ var array = mapStatuses(shuffleId)
if (array != null) {
array.synchronized {
if (array(mapId).address == bmAddress) {
@@ -125,7 +128,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
- val statuses = mapStatuses.get(shuffleId)
+ val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
fetching.synchronized {
@@ -138,7 +141,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
case e: InterruptedException =>
}
}
- return mapStatuses.get(shuffleId).map(status =>
+ return mapStatuses(shuffleId).map(status =>
(status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId))))
} else {
fetching += shuffleId
@@ -164,9 +167,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
}
+ def cleanup(cleanupTime: Long) {
+ mapStatuses.cleanup(cleanupTime)
+ cachedSerializedStatuses.cleanup(cleanupTime)
+ }
+
def stop() {
communicate(StopMapOutputTracker)
mapStatuses.clear()
+ metadataCleaner.cancel()
trackerActor = null
}
@@ -192,7 +201,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
generationLock.synchronized {
if (newGen > generation) {
logInfo("Updating generation to " + newGen + " and clearing cache")
- mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]
+ mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
generation = newGen
}
}
@@ -210,7 +219,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
case Some(bytes) =>
return bytes
case None =>
- statuses = mapStatuses.get(shuffleId)
+ statuses = mapStatuses(shuffleId)
generationGotten = generation
}
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 6af8c377b5..fbfcfbd704 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -211,27 +211,17 @@ abstract class RDD[T: ClassManifest](
if (startCheckpoint) {
val rdd = this
- val env = SparkEnv.get
-
- // Spawn a new thread to do the checkpoint as it takes sometime to write the RDD to file
- val th = new Thread() {
- override def run() {
- // Save the RDD to a file, create a new HadoopRDD from it,
- // and change the dependencies from the original parents to the new RDD
- SparkEnv.set(env)
- rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString
- rdd.saveAsObjectFile(checkpointFile)
- rdd.synchronized {
- rdd.checkpointRDD = context.objectFile[T](checkpointFile)
- rdd.checkpointRDDSplits = rdd.checkpointRDD.splits
- rdd.changeDependencies(rdd.checkpointRDD)
- rdd.shouldCheckpoint = false
- rdd.isCheckpointInProgress = false
- rdd.isCheckpointed = true
- }
- }
+ rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString
+ rdd.saveAsObjectFile(checkpointFile)
+ rdd.synchronized {
+ rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size)
+ rdd.checkpointRDDSplits = rdd.checkpointRDD.splits
+ rdd.changeDependencies(rdd.checkpointRDD)
+ rdd.shouldCheckpoint = false
+ rdd.isCheckpointInProgress = false
+ rdd.isCheckpointed = true
+ println("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " + rdd.checkpointRDD.id + ", " + rdd.checkpointRDD)
}
- th.start()
} else {
// Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked
dependencies.foreach(_.rdd.doCheckpoint())
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 4c6ec6cc6e..9f2b0c42c7 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -68,7 +68,6 @@ object SparkEnv extends Logging {
isMaster: Boolean,
isLocal: Boolean
) : SparkEnv = {
-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 1bdde25896..06fa559fb6 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -199,7 +199,13 @@ private object Utils extends Logging {
/**
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
*/
- def localIpAddress(): String = InetAddress.getLocalHost.getHostAddress
+ def localIpAddress(): String = {
+ val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
+ if (defaultIpOverride != null)
+ defaultIpOverride
+ else
+ InetAddress.getLocalHost.getHostAddress
+ }
private var customHostname: Option[String] = None
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 7eb4ddb74f..fef264aab1 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -11,6 +11,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark._
import spark.storage.StorageLevel
+import util.{MetadataCleaner, TimeStampedHashSet}
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
@@ -64,6 +65,10 @@ private object HttpBroadcast extends Logging {
private var serverUri: String = null
private var server: HttpServer = null
+ private val files = new TimeStampedHashSet[String]
+ private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
+
+
def initialize(isMaster: Boolean) {
synchronized {
if (!initialized) {
@@ -85,6 +90,7 @@ private object HttpBroadcast extends Logging {
server = null
}
initialized = false
+ cleaner.cancel()
}
}
@@ -108,6 +114,7 @@ private object HttpBroadcast extends Logging {
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
+ files += file.getAbsolutePath
}
def read[T](id: Long): T = {
@@ -123,4 +130,21 @@ private object HttpBroadcast extends Logging {
serIn.close()
obj
}
+
+ def cleanup(cleanupTime: Long) {
+ val iterator = files.internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ val (file, time) = (entry.getKey, entry.getValue)
+ if (time < cleanupTime) {
+ try {
+ iterator.remove()
+ new File(file.toString).delete()
+ logInfo("Deleted broadcast file '" + file + "'")
+ } catch {
+ case e: Exception => logWarning("Could not delete broadcast file '" + file + "'", e)
+ }
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index d2b63d6e0d..7a1089c816 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -67,8 +67,8 @@ private[spark] case object RequestMasterState
// Master to MasterWebUI
private[spark]
-case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
- completedJobs: List[JobInfo])
+case class MasterState(uri: String, workers: Array[WorkerInfo], activeJobs: Array[JobInfo],
+ completedJobs: Array[JobInfo])
// WorkerWebUI to Worker
private[spark] case object RequestWorkerState
@@ -78,4 +78,4 @@ private[spark] case object RequestWorkerState
private[spark]
case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
- coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file
+ coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)
diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala
new file mode 100644
index 0000000000..ad1a1092b2
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/WebUI.scala
@@ -0,0 +1,30 @@
+package spark.deploy
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+/**
+ * Utilities used throughout the web UI.
+ */
+private[spark] object WebUI {
+ val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+
+ def formatDate(date: Date): String = DATE_FORMAT.format(date)
+
+ def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
+
+ def formatDuration(milliseconds: Long): String = {
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return "%.0f s".format(seconds)
+ }
+ val minutes = seconds / 60
+ if (minutes < 10) {
+ return "%.1f min".format(minutes)
+ } else if (minutes < 60) {
+ return "%.0f min".format(minutes)
+ }
+ val hours = minutes / 60
+ return "%.1f h".format(hours)
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index e51b0c5c15..c57a1d33e9 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -35,6 +35,7 @@ private[spark] class Client(
class ClientActor extends Actor with Logging {
var master: ActorRef = null
+ var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
override def preStart() {
@@ -43,6 +44,7 @@ private[spark] class Client(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
+ masterAddress = master.path.address
master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
@@ -72,7 +74,17 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""))
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case Terminated(actor_) if actor_ == master =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientShutdown(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 8795c09cc1..130b031a2a 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -5,11 +5,17 @@ import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
-private[spark]
-class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
+private[spark] class JobInfo(
+ val startTime: Long,
+ val id: String,
+ val desc: JobDescription,
+ val submitDate: Date,
+ val actor: ActorRef)
+{
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
+ var endTime = -1L
private var nextExecutorId = 0
@@ -41,4 +47,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
_retryCount += 1
_retryCount
}
+
+ def markFinished(endState: JobState.Value) {
+ state = endState
+ endTime = System.currentTimeMillis()
+ }
+
+ def duration: Long = {
+ if (endTime != -1) {
+ endTime - startTime
+ } else {
+ System.currentTimeMillis() - startTime
+ }
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 6010f7cff2..7e5cd6b171 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
+ // As a temporary workaround before better ways of configuring memory, we allow users to set
+ // a flag that will perform round-robin scheduling across the nodes (spreading out each job
+ // among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
+ val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean
+
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -123,28 +128,62 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
case RequestMasterState => {
- sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
+ sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray)
}
}
/**
+ * Can a job use the given worker? True if the worker has enough memory and we haven't already
+ * launched an executor for the job on it (right now the standalone backend doesn't like having
+ * two executors on the same worker).
+ */
+ def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
+ worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
+ }
+
+ /**
* Schedule the currently available resources among waiting jobs. This method will be called
* every time a new job joins or resource availability changes.
*/
def schedule() {
- // Right now this is a very simple FIFO scheduler. We keep looking through the jobs
- // in order of submission time and launching the first one that fits on each node.
- for (worker <- workers if worker.coresFree > 0) {
- for (job <- waitingJobs.clone()) {
- val jobMemory = job.desc.memoryPerSlave
- if (worker.memoryFree >= jobMemory) {
- val coresToUse = math.min(worker.coresFree, job.coresLeft)
- val exec = job.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec)
+ // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
+ // in the queue, then the second job, etc.
+ if (spreadOutJobs) {
+ // Try to spread out each job among all the nodes, until it has all its cores
+ for (job <- waitingJobs if job.coresLeft > 0) {
+ val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
+ val numUsable = usableWorkers.length
+ val assigned = new Array[Int](numUsable) // Number of cores to give on each node
+ var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
+ var pos = 0
+ while (toAssign > 0) {
+ if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
+ toAssign -= 1
+ assigned(pos) += 1
+ }
+ pos = (pos + 1) % numUsable
}
- if (job.coresLeft == 0) {
- waitingJobs -= job
- job.state = JobState.RUNNING
+ // Now that we've decided how many cores to give on each node, let's actually give them
+ for (pos <- 0 until numUsable) {
+ if (assigned(pos) > 0) {
+ val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
+ launchExecutor(usableWorkers(pos), exec)
+ job.state = JobState.RUNNING
+ }
+ }
+ }
+ } else {
+ // Pack each job into as few nodes as possible until we've assigned all its cores
+ for (worker <- workers if worker.coresFree > 0) {
+ for (job <- waitingJobs if job.coresLeft > 0) {
+ if (canUse(job, worker)) {
+ val coresToUse = math.min(worker.coresFree, job.coresLeft)
+ if (coresToUse > 0) {
+ val exec = job.addExecutor(worker, coresToUse)
+ launchExecutor(worker, exec)
+ job.state = JobState.RUNNING
+ }
+ }
}
}
}
@@ -179,8 +218,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
- val date = new Date
- val job = new JobInfo(newJobId(date), desc, date, actor)
+ val now = System.currentTimeMillis()
+ val date = new Date(now)
+ val job = new JobInfo(now, newJobId(date), desc, date, actor)
jobs += job
idToJob(job.id) = job
actorToJob(sender) = job
@@ -189,19 +229,21 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
def removeJob(job: JobInfo) {
- logInfo("Removing job " + job.id)
- jobs -= job
- idToJob -= job.id
- actorToJob -= job.actor
- addressToWorker -= job.actor.path.address
- completedJobs += job // Remember it in our history
- waitingJobs -= job
- for (exec <- job.executors.values) {
- exec.worker.removeExecutor(exec)
- exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
+ if (jobs.contains(job)) {
+ logInfo("Removing job " + job.id)
+ jobs -= job
+ idToJob -= job.id
+ actorToJob -= job.actor
+ addressToWorker -= job.actor.path.address
+ completedJobs += job // Remember it in our history
+ waitingJobs -= job
+ for (exec <- job.executors.values) {
+ exec.worker.removeExecutor(exec)
+ exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
+ }
+ job.markFinished(JobState.FINISHED) // TODO: Mark it as FAILED if it failed
+ schedule()
}
- job.state = JobState.FINISHED
- schedule()
}
/** Generate a new job ID given a job's submission date */
diff --git a/core/src/main/scala/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
index 1b1c3dd0ad..4ceab3fc03 100644
--- a/core/src/main/scala/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
@@ -7,7 +7,7 @@ import spark.Utils
* Command-line parser for the master.
*/
private[spark] class MasterArguments(args: Array[String]) {
- var ip = Utils.localIpAddress()
+ var ip = Utils.localHostName()
var port = 7077
var webUiPort = 8080
@@ -59,4 +59,4 @@ private[spark] class MasterArguments(args: Array[String]) {
" --webui-port PORT Port for web UI (default: 8080)")
System.exit(exitCode)
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 700a41c770..3cdd3721f5 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -36,7 +36,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
// A bit ugly an inefficient, but we won't have a number of jobs
// so large that it will make a significant difference.
- (masterState.activeJobs ::: masterState.completedJobs).find(_.id == jobId) match {
+ (masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match {
case Some(job) => spark.deploy.master.html.job_details.render(job)
case _ => null
}
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 16b3f9b653..706b1453aa 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -33,6 +33,10 @@ private[spark] class WorkerInfo(
memoryUsed -= exec.memory
}
}
+
+ def hasExecutor(job: JobInfo): Boolean = {
+ executors.values.exists(_.job == job)
+ }
def webUiAddress : String = {
"http://" + this.host + ":" + this.webUiPort
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 474c9364fd..67d41dda29 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -123,7 +123,7 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
- master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
+ master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None)
case ExecutorStateChanged(jobId, execId, state, message) =>
master ! ExecutorStateChanged(jobId, execId, state, message)
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
index 60dc107a4c..340920025b 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
@@ -9,7 +9,7 @@ import java.lang.management.ManagementFactory
* Command-line parser for the master.
*/
private[spark] class WorkerArguments(args: Array[String]) {
- var ip = Utils.localIpAddress()
+ var ip = Utils.localHostName()
var port = 0
var webUiPort = 8081
var cores = inferDefaultCores()
@@ -110,4 +110,4 @@ private[spark] class WorkerArguments(args: Array[String]) {
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, 512)
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index dfdb22024e..cb29a6b8b4 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -43,6 +43,21 @@ private[spark] class Executor extends Logging {
urlClassLoader = createClassLoader()
Thread.currentThread.setContextClassLoader(urlClassLoader)
+ // Make any thread terminations due to uncaught exceptions kill the entire
+ // executor process to avoid surprising stalls.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler {
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+ System.exit(1)
+ } catch {
+ case t: Throwable => System.exit(2)
+ }
+ }
+ }
+ )
+
// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
SparkEnv.set(env)
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index da39108164..642fa4b525 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -304,7 +304,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
connectionRequests += newConnection
newConnection
}
- val connection = connectionsById.getOrElse(connectionManagerId, startNewConnection())
+ val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
+ val connection = connectionsById.getOrElse(lookupKey, startNewConnection())
message.senderAddress = id.toSocketAddress()
logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
/*connection.send(message)*/
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index aaaed59c4a..4b2570fa2b 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -14,6 +14,7 @@ import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
import spark.storage.BlockManagerMaster
import spark.storage.BlockManagerId
+import util.{MetadataCleaner, TimeStampedHashMap}
/**
* A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
@@ -61,9 +62,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val nextStageId = new AtomicInteger(0)
- val idToStage = new HashMap[Int, Stage]
+ val idToStage = new TimeStampedHashMap[Int, Stage]
- val shuffleToMapStage = new HashMap[Int, Stage]
+ val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
var cacheLocs = new HashMap[Int, Array[List[String]]]
@@ -77,12 +78,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
- val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage
+ val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage
var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]
+ val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
+
// Start a thread to run the DAGScheduler event loop
new Thread("DAGScheduler") {
setDaemon(true)
@@ -591,8 +594,23 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
return Nil
}
+ def cleanup(cleanupTime: Long) {
+ var sizeBefore = idToStage.size
+ idToStage.cleanup(cleanupTime)
+ logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
+
+ sizeBefore = shuffleToMapStage.size
+ shuffleToMapStage.cleanup(cleanupTime)
+ logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)
+
+ sizeBefore = pendingTasks.size
+ pendingTasks.cleanup(cleanupTime)
+ logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
+ }
+
def stop() {
eventQueue.put(StopDAGScheduler)
+ metadataCleaner.cancel()
taskSched.stop()
}
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 60105c42b6..683f5ebec3 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -14,17 +14,20 @@ import com.ning.compress.lzf.LZFOutputStream
import spark._
import spark.storage._
+import util.{TimeStampedHashMap, MetadataCleaner}
private[spark] object ShuffleMapTask {
// A simple map between the stage id to the serialized byte array of a task.
// Served as a cache for task serialization because serialization can be
// expensive on the master node if it needs to launch thousands of tasks.
- val serializedInfoCache = new JHashMap[Int, Array[Byte]]
+ val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
+
+ val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.cleanup)
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
- val old = serializedInfoCache.get(stageId)
+ val old = serializedInfoCache.get(stageId).orNull
if (old != null) {
return old
} else {
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index cdfe1f2563..814443fa52 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -272,7 +272,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
- scheduler.slaveLost(slaveId.toString)
+ scheduler.slaveLost(slaveId.getValue)
}
override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 70d6d8369d..1e36578e1a 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -10,43 +10,17 @@ import java.nio.{MappedByteBuffer, ByteBuffer}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
-import scala.collection.JavaConversions._
import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
-import spark.util.ByteBufferInputStream
+import spark.util.{MetadataCleaner, TimeStampedHashMap, ByteBufferInputStream}
+
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import sun.nio.ch.DirectBuffer
-private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
- def this() = this(null, 0) // For deserialization only
-
- def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
-
- override def writeExternal(out: ObjectOutput) {
- out.writeUTF(ip)
- out.writeInt(port)
- }
-
- override def readExternal(in: ObjectInput) {
- ip = in.readUTF()
- port = in.readInt()
- }
-
- override def toString = "BlockManagerId(" + ip + ", " + port + ")"
-
- override def hashCode = ip.hashCode * 41 + port
-
- override def equals(that: Any) = that match {
- case id: BlockManagerId => port == id.port && ip == id.ip
- case _ => false
- }
-}
-
-
-private[spark]
+private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
@@ -77,7 +51,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
+ private val blockInfo = new TimeStampedHashMap[String, BlockInfo]()
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -106,6 +80,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
/**
@@ -128,8 +103,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Get storage level of local block. If no info exists for the block, then returns null.
*/
def getLevel(blockId: String): StorageLevel = {
- val info = blockInfo.get(blockId)
- if (info != null) info.level else null
+ blockInfo.get(blockId).map(_.level).orNull
}
/**
@@ -139,9 +113,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def reportBlockStatus(blockId: String) {
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
- case null =>
+ case None =>
(StorageLevel.NONE, 0L, 0L)
- case info =>
+ case Some(info) =>
info.synchronized {
info.level match {
case null =>
@@ -199,7 +173,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@@ -284,7 +258,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@@ -543,7 +517,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
- val oldBlock = blockInfo.get(blockId)
+ val oldBlock = blockInfo.get(blockId).orNull
if (oldBlock != null) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
oldBlock.waitForReady()
@@ -644,7 +618,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
- if (blockInfo.containsKey(blockId)) {
+ if (blockInfo.contains(blockId)) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
return
}
@@ -766,7 +740,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
val level = info.level
@@ -793,6 +767,29 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ def dropOldBlocks(cleanupTime: Long) {
+ logInfo("Dropping blocks older than " + cleanupTime)
+ val iterator = blockInfo.internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
+ if (time < cleanupTime) {
+ info.synchronized {
+ val level = info.level
+ if (level.useMemory) {
+ memoryStore.remove(id)
+ }
+ if (level.useDisk) {
+ diskStore.remove(id)
+ }
+ iterator.remove()
+ logInfo("Dropped block " + id)
+ }
+ reportBlockStatus(id)
+ }
+ }
+ }
+
def shouldCompress(blockId: String): Boolean = {
if (blockId.startsWith("shuffle_")) {
compressShuffle
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
new file mode 100644
index 0000000000..4933cc6606
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -0,0 +1,48 @@
+package spark.storage
+
+import java.io.{IOException, ObjectOutput, ObjectInput, Externalizable}
+import java.util.concurrent.ConcurrentHashMap
+
+private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
+ def this() = this(null, 0) // For deserialization only
+
+ def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
+
+ override def writeExternal(out: ObjectOutput) {
+ out.writeUTF(ip)
+ out.writeInt(port)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ ip = in.readUTF()
+ port = in.readInt()
+ }
+
+ @throws(classOf[IOException])
+ private def readResolve(): Object = {
+ BlockManagerId.getCachedBlockManagerId(this)
+ }
+
+
+ override def toString = "BlockManagerId(" + ip + ", " + port + ")"
+
+ override def hashCode = ip.hashCode * 41 + port
+
+ override def equals(that: Any) = that match {
+ case id: BlockManagerId => port == id.port && ip == id.ip
+ case _ => false
+ }
+}
+
+object BlockManagerId {
+ val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
+
+ def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
+ if (blockManagerIdCache.containsKey(id)) {
+ blockManagerIdCache.get(id)
+ } else {
+ blockManagerIdCache.put(id, id)
+ id
+ }
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 4d5ee8318c..af15663621 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -243,6 +243,12 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ // Can happen if this is from a locally cached partition on the master
+ sender ! true
+ return
+ }
+
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in updateBlockInfo 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
@@ -335,6 +341,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
throw new Exception("Self index for " + blockManagerId + " not found")
}
+ // Note that this logic will select the same node multiple times if there aren't enough peers
var index = selfIndex
while (res.size < size) {
index += 1
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 09769d1f7d..a222b2f3df 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -40,7 +40,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true)
} else {
- val entry = new Entry(bytes, bytes.limit, false)
tryToPut(blockId, bytes, bytes.limit, false)
}
}
@@ -175,6 +174,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Otherwise, the freed space may fill up before the caller puts in their new value.
*/
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
+
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index c497f03e0c..eb88eb2759 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -1,6 +1,9 @@
package spark.storage
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{IOException, Externalizable, ObjectInput, ObjectOutput}
+import collection.mutable
+import util.Random
+import collection.mutable.ArrayBuffer
/**
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
@@ -17,7 +20,8 @@ class StorageLevel(
extends Externalizable {
// TODO: Also add fields for caching priority, dataset ID, and flushing.
-
+ assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
+
def this(flags: Int, replication: Int) {
this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}
@@ -27,6 +31,10 @@ class StorageLevel(
override def clone(): StorageLevel = new StorageLevel(
this.useDisk, this.useMemory, this.deserialized, this.replication)
+ override def hashCode(): Int = {
+ toInt * 41 + replication
+ }
+
override def equals(other: Any): Boolean = other match {
case s: StorageLevel =>
s.useDisk == useDisk &&
@@ -66,6 +74,11 @@ class StorageLevel(
replication = in.readByte()
}
+ @throws(classOf[IOException])
+ private def readResolve(): Object = {
+ StorageLevel.getCachedStorageLevel(this)
+ }
+
override def toString: String =
"StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
}
@@ -82,4 +95,15 @@ object StorageLevel {
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
+
+ val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
+
+ def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
+ if (storageLevelCache.containsKey(level)) {
+ storageLevelCache.get(level)
+ } else {
+ storageLevelCache.put(level, level)
+ level
+ }
+ }
}
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
new file mode 100644
index 0000000000..e4a5b8ffdf
--- /dev/null
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -0,0 +1,91 @@
+package spark.storage
+
+import akka.actor._
+
+import spark.KryoSerializer
+import java.util.concurrent.ArrayBlockingQueue
+import util.Random
+
+/**
+ * This class tests the BlockManager and MemoryStore for thread safety and
+ * deadlocks. It spawns a number of producer and consumer threads. Producer
+ * threads continuously pushes blocks into the BlockManager and consumer
+ * threads continuously retrieves the blocks form the BlockManager and tests
+ * whether the block is correct or not.
+ */
+private[spark] object ThreadingTest {
+
+ val numProducers = 5
+ val numBlocksPerProducer = 20000
+
+ private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
+ val queue = new ArrayBlockingQueue[(String, Seq[Int])](100)
+
+ override def run() {
+ for (i <- 1 to numBlocksPerProducer) {
+ val blockId = "b-" + id + "-" + i
+ val blockSize = Random.nextInt(1000)
+ val block = (1 to blockSize).map(_ => Random.nextInt())
+ val level = randomLevel()
+ val startTime = System.currentTimeMillis()
+ manager.put(blockId, block.iterator, level, true)
+ println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ queue.add((blockId, block))
+ }
+ println("Producer thread " + id + " terminated")
+ }
+
+ def randomLevel(): StorageLevel = {
+ math.abs(Random.nextInt()) % 4 match {
+ case 0 => StorageLevel.MEMORY_ONLY
+ case 1 => StorageLevel.MEMORY_ONLY_SER
+ case 2 => StorageLevel.MEMORY_AND_DISK
+ case 3 => StorageLevel.MEMORY_AND_DISK_SER
+ }
+ }
+ }
+
+ private[spark] class ConsumerThread(
+ manager: BlockManager,
+ queue: ArrayBlockingQueue[(String, Seq[Int])]
+ ) extends Thread {
+ var numBlockConsumed = 0
+
+ override def run() {
+ println("Consumer thread started")
+ while(numBlockConsumed < numBlocksPerProducer) {
+ val (blockId, block) = queue.take()
+ val startTime = System.currentTimeMillis()
+ manager.get(blockId) match {
+ case Some(retrievedBlock) =>
+ assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
+ println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ case None =>
+ assert(false, "Block " + blockId + " could not be retrieved")
+ }
+ numBlockConsumed += 1
+ }
+ println("Consumer thread terminated")
+ }
+ }
+
+ def main(args: Array[String]) {
+ System.setProperty("spark.kryoserializer.buffer.mb", "1")
+ val actorSystem = ActorSystem("test")
+ val serializer = new KryoSerializer
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
+ val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
+ val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
+ val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
+ producers.foreach(_.start)
+ consumers.foreach(_.start)
+ producers.foreach(_.join)
+ consumers.foreach(_.join)
+ blockManager.stop()
+ blockManagerMaster.stop()
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ println("Everything stopped.")
+ println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
+ }
+}
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index b466b5239c..e67cb0336d 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -25,6 +25,8 @@ private[spark] object AkkaUtils {
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
+ val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
+ val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
@@ -32,10 +34,11 @@ private[spark] object AkkaUtils {
akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d
- akka.remote.netty.connection-timeout = 1s
+ akka.remote.netty.connection-timeout = %ds
+ akka.remote.netty.message-frame-size = %d MiB
akka.remote.netty.execution-pool-size = %d
akka.actor.default-dispatcher.throughput = %d
- """.format(host, port, akkaThreads, akkaBatchSize))
+ """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
new file mode 100644
index 0000000000..2541b26255
--- /dev/null
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -0,0 +1,37 @@
+package spark.util
+
+import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
+import java.util.{TimerTask, Timer}
+import spark.Logging
+
+class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
+ val delaySeconds = MetadataCleaner.getDelaySeconds
+ val periodSeconds = math.max(10, delaySeconds / 10)
+ val timer = new Timer(name + " cleanup timer", true)
+ val task = new TimerTask {
+ def run() {
+ try {
+ if (delaySeconds > 0) {
+ cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+ logInfo("Ran metadata cleaner for " + name)
+ }
+ } catch {
+ case e: Exception => logError("Error running cleanup task for " + name, e)
+ }
+ }
+ }
+ if (periodSeconds > 0) {
+ logInfo("Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ + "period of " + periodSeconds + " secs")
+ timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
+ }
+
+ def cancel() {
+ timer.cancel()
+ }
+}
+
+object MetadataCleaner {
+ def getDelaySeconds = (System.getProperty("spark.cleaner.delay", "-100").toDouble * 60).toInt
+ def setDelaySeconds(delay: Long) { System.setProperty("spark.cleaner.delay", delay.toString) }
+}
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
new file mode 100644
index 0000000000..52f03784db
--- /dev/null
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -0,0 +1,90 @@
+package spark.util
+
+import scala.collection.JavaConversions
+import scala.collection.mutable.Map
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
+ * time stamp along with each key-value pair. Key-value pairs that are older than a particular
+ * threshold time can them be removed using the cleanup method. This is intended to be a drop-in
+ * replacement of scala.collection.mutable.HashMap.
+ */
+class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
+ val internalMap = new ConcurrentHashMap[A, (B, Long)]()
+
+ def get(key: A): Option[B] = {
+ val value = internalMap.get(key)
+ if (value != null) Some(value._1) else None
+ }
+
+ def iterator: Iterator[(A, B)] = {
+ val jIterator = internalMap.entrySet().iterator()
+ JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1))
+ }
+
+ override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
+ val newMap = new TimeStampedHashMap[A, B1]
+ newMap.internalMap.putAll(this.internalMap)
+ newMap.internalMap.put(kv._1, (kv._2, currentTime))
+ newMap
+ }
+
+ override def - (key: A): Map[A, B] = {
+ val newMap = new TimeStampedHashMap[A, B]
+ newMap.internalMap.putAll(this.internalMap)
+ newMap.internalMap.remove(key)
+ newMap
+ }
+
+ override def += (kv: (A, B)): this.type = {
+ internalMap.put(kv._1, (kv._2, currentTime))
+ this
+ }
+
+ override def -= (key: A): this.type = {
+ internalMap.remove(key)
+ this
+ }
+
+ override def update(key: A, value: B) {
+ this += ((key, value))
+ }
+
+ override def apply(key: A): B = {
+ val value = internalMap.get(key)
+ if (value == null) throw new NoSuchElementException()
+ value._1
+ }
+
+ override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
+ JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
+ }
+
+ override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
+
+ override def size(): Int = internalMap.size()
+
+ override def foreach[U](f: ((A, B)) => U): Unit = {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ val kv = (entry.getKey, entry.getValue._1)
+ f(kv)
+ }
+ }
+
+ def cleanup(threshTime: Long) {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ if (entry.getValue._2 < threshTime) {
+ logDebug("Removing key " + entry.getKey)
+ iterator.remove()
+ }
+ }
+ }
+
+ private def currentTime: Long = System.currentTimeMillis()
+
+}
diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
new file mode 100644
index 0000000000..539dd75844
--- /dev/null
+++ b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
@@ -0,0 +1,66 @@
+package spark.util
+
+import scala.collection.mutable.Set
+import scala.collection.JavaConversions
+import java.util.concurrent.ConcurrentHashMap
+
+
+class TimeStampedHashSet[A] extends Set[A] {
+ val internalMap = new ConcurrentHashMap[A, Long]()
+
+ def contains(key: A): Boolean = {
+ internalMap.contains(key)
+ }
+
+ def iterator: Iterator[A] = {
+ val jIterator = internalMap.entrySet().iterator()
+ JavaConversions.asScalaIterator(jIterator).map(_.getKey)
+ }
+
+ override def + (elem: A): Set[A] = {
+ val newSet = new TimeStampedHashSet[A]
+ newSet ++= this
+ newSet += elem
+ newSet
+ }
+
+ override def - (elem: A): Set[A] = {
+ val newSet = new TimeStampedHashSet[A]
+ newSet ++= this
+ newSet -= elem
+ newSet
+ }
+
+ override def += (key: A): this.type = {
+ internalMap.put(key, currentTime)
+ this
+ }
+
+ override def -= (key: A): this.type = {
+ internalMap.remove(key)
+ this
+ }
+
+ override def empty: Set[A] = new TimeStampedHashSet[A]()
+
+ override def size(): Int = internalMap.size()
+
+ override def foreach[U](f: (A) => U): Unit = {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ f(iterator.next.getKey)
+ }
+ }
+
+ def cleanup(threshTime: Long) {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ if (entry.getValue < threshTime) {
+ iterator.remove()
+ }
+ }
+ }
+
+ private def currentTime: Long = System.currentTimeMillis()
+}
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
index 4e95ac2ac6..03559751bc 100644
--- a/core/src/main/scala/spark/util/Vector.scala
+++ b/core/src/main/scala/spark/util/Vector.scala
@@ -49,7 +49,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
return ans
}
- def +=(other: Vector) {
+ def += (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
var ans = 0.0
@@ -58,6 +58,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
elements(i) += other(i)
i += 1
}
+ this
}
def * (scale: Double): Vector = Vector(length, i => this(i) * scale)
diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html
index 7562076b00..18c32e5a1f 100644
--- a/core/src/main/twirl/spark/deploy/master/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/index.scala.html
@@ -1,5 +1,6 @@
@(state: spark.deploy.MasterState)
@import spark.deploy.master._
+@import spark.Utils
@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) {
@@ -8,9 +9,11 @@
<div class="span12">
<ul class="unstyled">
<li><strong>URL:</strong> spark://@(state.uri)</li>
- <li><strong>Number of Workers:</strong> @state.workers.size </li>
- <li><strong>Cores:</strong> @state.workers.map(_.cores).sum Total, @state.workers.map(_.coresUsed).sum Used</li>
- <li><strong>Memory:</strong> @state.workers.map(_.memory).sum Total, @state.workers.map(_.memoryUsed).sum Used</li>
+ <li><strong>Workers:</strong> @state.workers.size </li>
+ <li><strong>Cores:</strong> @{state.workers.map(_.cores).sum} Total,
+ @{state.workers.map(_.coresUsed).sum} Used</li>
+ <li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
+ @{Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
<li><strong>Jobs:</strong> @state.activeJobs.size Running, @state.completedJobs.size Completed </li>
</ul>
</div>
@@ -21,7 +24,7 @@
<div class="span12">
<h3> Cluster Summary </h3>
<br/>
- @worker_table(state.workers)
+ @worker_table(state.workers.sortBy(_.id))
</div>
</div>
@@ -32,7 +35,7 @@
<div class="span12">
<h3> Running Jobs </h3>
<br/>
- @job_table(state.activeJobs)
+ @job_table(state.activeJobs.sortBy(_.startTime).reverse)
</div>
</div>
@@ -43,7 +46,7 @@
<div class="span12">
<h3> Completed Jobs </h3>
<br/>
- @job_table(state.completedJobs)
+ @job_table(state.completedJobs.sortBy(_.endTime).reverse)
</div>
</div>
diff --git a/core/src/main/twirl/spark/deploy/master/job_row.scala.html b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
index 7c4865bb6e..7c466a6a2c 100644
--- a/core/src/main/twirl/spark/deploy/master/job_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
@@ -1,20 +1,20 @@
@(job: spark.deploy.master.JobInfo)
+@import spark.Utils
+@import spark.deploy.WebUI.formatDate
+@import spark.deploy.WebUI.formatDuration
+
<tr>
<td>
<a href="job?jobId=@(job.id)">@job.id</a>
</td>
<td>@job.desc.name</td>
<td>
- @job.coresGranted Granted
- @if(job.desc.cores == Integer.MAX_VALUE) {
-
- } else {
- , @job.coresLeft
- }
+ @job.coresGranted
</td>
- <td>@job.desc.memoryPerSlave</td>
- <td>@job.submitDate</td>
+ <td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
+ <td>@formatDate(job.submitDate)</td>
<td>@job.desc.user</td>
<td>@job.state.toString()</td>
-</tr> \ No newline at end of file
+ <td>@formatDuration(job.duration)</td>
+</tr>
diff --git a/core/src/main/twirl/spark/deploy/master/job_table.scala.html b/core/src/main/twirl/spark/deploy/master/job_table.scala.html
index 52bad6c4b8..d267d6e85e 100644
--- a/core/src/main/twirl/spark/deploy/master/job_table.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_table.scala.html
@@ -1,4 +1,4 @@
-@(jobs: List[spark.deploy.master.JobInfo])
+@(jobs: Array[spark.deploy.master.JobInfo])
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
@@ -6,10 +6,11 @@
<th>JobID</th>
<th>Description</th>
<th>Cores</th>
- <th>Memory per Slave</th>
- <th>Submit Date</th>
+ <th>Memory per Node</th>
+ <th>Submit Time</th>
<th>User</th>
<th>State</th>
+ <th>Duration</th>
</tr>
</thead>
<tbody>
@@ -17,4 +18,4 @@
@job_row(j)
}
</tbody>
-</table> \ No newline at end of file
+</table>
diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
index 017cc4859e..3dcba3a545 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
@@ -1,11 +1,13 @@
@(worker: spark.deploy.master.WorkerInfo)
+@import spark.Utils
+
<tr>
<td>
<a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
</td>
<td>@{worker.host}:@{worker.port}</td>
<td>@worker.cores (@worker.coresUsed Used)</td>
- <td>@{spark.Utils.memoryMegabytesToString(worker.memory)}
- (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
+ <td>@{Utils.memoryMegabytesToString(worker.memory)}
+ (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
</tr>
diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
index 2028842297..fad1af41dc 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
@@ -1,4 +1,4 @@
-@(workers: List[spark.deploy.master.WorkerInfo])
+@(workers: Array[spark.deploy.master.WorkerInfo])
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
@@ -14,4 +14,4 @@
@worker_row(w)
}
</tbody>
-</table> \ No newline at end of file
+</table>
diff --git a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
index c3842dbf85..ea9542461e 100644
--- a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
@@ -1,20 +1,20 @@
@(executor: spark.deploy.worker.ExecutorRunner)
+@import spark.Utils
+
<tr>
<td>@executor.execId</td>
<td>@executor.cores</td>
- <td>@executor.memory</td>
+ <td>@Utils.memoryMegabytesToString(executor.memory)</td>
<td>
<ul class="unstyled">
<li><strong>ID:</strong> @executor.jobId</li>
<li><strong>Name:</strong> @executor.jobDesc.name</li>
<li><strong>User:</strong> @executor.jobDesc.user</li>
- <li><strong>Cores:</strong> @executor.jobDesc.cores </li>
- <li><strong>Memory per Slave:</strong> @executor.jobDesc.memoryPerSlave</li>
</ul>
</td>
<td>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stdout">stdout</a>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stderr">stderr</a>
</td>
-</tr> \ No newline at end of file
+</tr>
diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html
index 69746ed02c..b247307dab 100644
--- a/core/src/main/twirl/spark/deploy/worker/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html
@@ -1,5 +1,7 @@
@(worker: spark.deploy.WorkerState)
+@import spark.Utils
+
@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) {
<!-- Worker Details -->
@@ -12,8 +14,8 @@
(WebUI at <a href="@worker.masterWebUiUrl">@worker.masterWebUiUrl</a>)
</li>
<li><strong>Cores:</strong> @worker.cores (@worker.coresUsed Used)</li>
- <li><strong>Memory:</strong> @{spark.Utils.memoryMegabytesToString(worker.memory)}
- (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
+ <li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(worker.memory)}
+ (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
</ul>
</div>
</div>
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 0e78228134..a2d5e39859 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -57,6 +57,32 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
+ test("StorageLevel object caching") {
+ val level1 = new StorageLevel(false, false, false, 3)
+ val level2 = new StorageLevel(false, false, false, 3)
+ val bytes1 = spark.Utils.serialize(level1)
+ val level1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = spark.Utils.serialize(level2)
+ val level2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ assert(level1_ === level1, "Deserialized level1 not same as original level1")
+ assert(level2_ === level2, "Deserialized level2 not same as original level1")
+ assert(level1_ === level2_, "Deserialized level1 not same as deserialized level2")
+ assert(level2_.eq(level1_), "Deserialized level2 not the same object as deserialized level1")
+ }
+
+ test("BlockManagerId object caching") {
+ val id1 = new StorageLevel(false, false, false, 3)
+ val id2 = new StorageLevel(false, false, false, 3)
+ val bytes1 = spark.Utils.serialize(id1)
+ val id1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = spark.Utils.serialize(id2)
+ val id2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ assert(id1_ === id1, "Deserialized id1 not same as original id1")
+ assert(id2_ === id2, "Deserialized id2 not same as original id1")
+ assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2")
+ assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1")
+ }
+
test("master + 1 manager interaction") {
store = new BlockManager(master, serializer, 2000)
val a1 = new Array[Byte](400)
diff --git a/docs/quick-start.md b/docs/quick-start.md
index defdb34836..dbc232b6e0 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -6,7 +6,7 @@ title: Quick Start
* This will become a table of contents (this text will be scraped).
{:toc}
-This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will need much for this), then show how to write standalone jobs in Scala and Java. See the [programming guide](scala-programming-guide.html) for a fuller reference.
+This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will not need much for this), then show how to write standalone jobs in Scala and Java. See the [programming guide](scala-programming-guide.html) for a more complete reference.
To follow along with this guide, you only need to have successfully built Spark on one machine. Simply go into your Spark directory and run:
@@ -60,7 +60,7 @@ scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a
res4: Long = 16
{% endhighlight %}
-This first maps a line to an integer value, creating a new RDD. `reduce` is called on that RDD to find the largest line count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand:
+This first maps a line to an integer value, creating a new RDD. `reduce` is called on that RDD to find the largest line count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand:
{% highlight scala %}
scala> import java.lang.Math
@@ -98,10 +98,10 @@ scala> linesWithSpark.count()
res9: Long = 15
{% endhighlight %}
-It may seem silly to use a Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
+It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
# A Standalone Job in Scala
-Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, consider using the Spark assembly JAR described in the developer guide.
+Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.
We'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.scala`:
@@ -112,7 +112,7 @@ import SparkContext._
object SimpleJob extends Application {
val logFile = "/var/log/syslog" // Should be some file on your system
- val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
+ val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
"target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
@@ -139,10 +139,10 @@ resolvers ++= Seq(
"Spray Repository" at "http://repo.spray.cc/")
{% endhighlight %}
-Of course, for sbt to work correctly, we'll need to layout `SimpleJob.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the job's code, then use `sbt run` to execute our example job.
+Of course, for sbt to work correctly, we'll need to layout `SimpleJob.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the job's code, then use `sbt run` to execute our example job.
{% highlight bash %}
-$ find .
+$ find .
.
./simple.sbt
./src
@@ -159,7 +159,7 @@ Lines with a: 8422, Lines with b: 1836
This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
# A Standalone Job In Java
-Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you using other build systems, consider using the Spark assembly JAR described in the developer guide.
+Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.
We'll create a very simple Spark job, `SimpleJob.java`:
@@ -171,7 +171,7 @@ import spark.api.java.function.Function;
public class SimpleJob {
public static void main(String[] args) {
String logFile = "/var/log/syslog"; // Should be some file on your system
- JavaSparkContext sc = new JavaSparkContext("local", "Simple Job",
+ JavaSparkContext sc = new JavaSparkContext("local", "Simple Job",
"$YOUR_SPARK_HOME", "target/simple-project-1.0.jar");
JavaRDD<String> logData = sc.textFile(logFile).cache();
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 97564d7426..f4a3eb667c 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -15,7 +15,7 @@ Spark can run on private clusters managed by the [Apache Mesos](http://incubator
6. Copy Spark and Mesos to the _same_ paths on all the nodes in the cluster (or, for Mesos, `make install` on every node).
7. Configure Mesos for deployment:
* On your master node, edit `<prefix>/var/mesos/deploy/masters` to list your master and `<prefix>/var/mesos/deploy/slaves` to list the slaves, where `<prefix>` is the prefix where you installed Mesos (`/usr/local` by default).
- * On all nodes, edit `<prefix>/var/mesos/deploy/mesos.conf` and add the line `master=HOST:5050`, where HOST is your master node.
+ * On all nodes, edit `<prefix>/var/mesos/conf/mesos.conf` and add the line `master=HOST:5050`, where HOST is your master node.
* Run `<prefix>/sbin/mesos-start-cluster.sh` on your master to start Mesos. If all goes well, you should see Mesos's web UI on port 8080 of the master machine.
* See Mesos's README file for more information on deploying it.
8. To run a Spark job against the cluster, when you create your `SparkContext`, pass the string `mesos://HOST:5050` as the first parameter, where `HOST` is the machine running your Mesos master. In addition, pass the location of Spark on your nodes as the third parameter, and a list of JAR files containing your JAR's code as the fourth (these will automatically get copied to the workers). For example:
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 73f8b123be..7350eca837 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -19,7 +19,7 @@ This guide shows each of these features and walks through some samples. It assum
To write a Spark application, you will need to add both Spark and its dependencies to your CLASSPATH. If you use sbt or Maven, Spark is available through Maven Central at:
- groupId = org.spark_project
+ groupId = org.spark-project
artifactId = spark-core_{{site.SCALA_VERSION}}
version = {{site.SPARK_VERSION}}
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 6a3647b218..2ab11dbd34 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -19,7 +19,6 @@
from __future__ import with_statement
-import boto
import logging
import os
import random
@@ -32,7 +31,7 @@ import urllib2
from optparse import OptionParser
from sys import stderr
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
-
+from boto import ec2
# A static URL from which to figure out the latest Mesos EC2 AMI
LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.6"
@@ -61,7 +60,9 @@ def parse_args():
parser.add_option("-r", "--region", default="us-east-1",
help="EC2 region zone to launch instances in")
parser.add_option("-z", "--zone", default="",
- help="Availability zone to launch instances in")
+ help="Availability zone to launch instances in, or 'all' to spread " +
+ "slaves across multiple (an additional $0.01/Gb for bandwidth" +
+ "between zones applies)")
parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " +
"available AMI (default: latest)")
@@ -97,14 +98,20 @@ def parse_args():
if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
sys.exit(1)
- if os.getenv('AWS_ACCESS_KEY_ID') == None:
- print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
- "must be set")
- sys.exit(1)
- if os.getenv('AWS_SECRET_ACCESS_KEY') == None:
- print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
- "must be set")
- sys.exit(1)
+
+ # Boto config check
+ # http://boto.cloudhackers.com/en/latest/boto_config_tut.html
+ home_dir = os.getenv('HOME')
+ if home_dir == None or not os.path.isfile(home_dir + '/.boto'):
+ if not os.path.isfile('/etc/boto.cfg'):
+ if os.getenv('AWS_ACCESS_KEY_ID') == None:
+ print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
+ "must be set")
+ sys.exit(1)
+ if os.getenv('AWS_SECRET_ACCESS_KEY') == None:
+ print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
+ "must be set")
+ sys.exit(1)
return (opts, action, cluster_name)
@@ -180,16 +187,12 @@ def launch_cluster(conn, opts, cluster_name):
zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
# Check if instances are already running in our groups
- print "Checking for running cluster..."
- reservations = conn.get_all_instances()
- for res in reservations:
- group_names = [g.id for g in res.groups]
- if master_group.name in group_names or slave_group.name in group_names or zoo_group.name in group_names:
- active = [i for i in res.instances if is_active(i)]
- if len(active) > 0:
- print >> stderr, ("ERROR: There are already instances running in " +
- "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
- sys.exit(1)
+ active_nodes = get_existing_cluster(conn, opts, cluster_name,
+ die_on_error=False)
+ if any(active_nodes):
+ print >> stderr, ("ERROR: There are already instances running in " +
+ "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
+ sys.exit(1)
# Figure out the latest AMI from our static URL
if opts.ami == "latest":
@@ -221,55 +224,83 @@ def launch_cluster(conn, opts, cluster_name):
# Launch spot instances with the requested price
print ("Requesting %d slaves as spot instances with price $%.3f" %
(opts.slaves, opts.spot_price))
- slave_reqs = conn.request_spot_instances(
- price = opts.spot_price,
- image_id = opts.ami,
- launch_group = "launch-group-%s" % cluster_name,
- placement = opts.zone,
- count = opts.slaves,
- key_name = opts.key_pair,
- security_groups = [slave_group],
- instance_type = opts.instance_type,
- block_device_map = block_map)
- my_req_ids = [req.id for req in slave_reqs]
+ zones = get_zones(conn, opts)
+ num_zones = len(zones)
+ i = 0
+ my_req_ids = []
+ for zone in zones:
+ num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
+ slave_reqs = conn.request_spot_instances(
+ price = opts.spot_price,
+ image_id = opts.ami,
+ launch_group = "launch-group-%s" % cluster_name,
+ placement = zone,
+ count = num_slaves_this_zone,
+ key_name = opts.key_pair,
+ security_groups = [slave_group],
+ instance_type = opts.instance_type,
+ block_device_map = block_map)
+ my_req_ids += [req.id for req in slave_reqs]
+ i += 1
+
print "Waiting for spot instances to be granted..."
- while True:
- time.sleep(10)
- reqs = conn.get_all_spot_instance_requests()
- id_to_req = {}
- for r in reqs:
- id_to_req[r.id] = r
- active = 0
- instance_ids = []
- for i in my_req_ids:
- if id_to_req[i].state == "active":
- active += 1
- instance_ids.append(id_to_req[i].instance_id)
- if active == opts.slaves:
- print "All %d slaves granted" % opts.slaves
- reservations = conn.get_all_instances(instance_ids)
- slave_nodes = []
- for r in reservations:
- slave_nodes += r.instances
- break
- else:
- print "%d of %d slaves granted, waiting longer" % (active, opts.slaves)
+ try:
+ while True:
+ time.sleep(10)
+ reqs = conn.get_all_spot_instance_requests()
+ id_to_req = {}
+ for r in reqs:
+ id_to_req[r.id] = r
+ active_instance_ids = []
+ for i in my_req_ids:
+ if i in id_to_req and id_to_req[i].state == "active":
+ active_instance_ids.append(id_to_req[i].instance_id)
+ if len(active_instance_ids) == opts.slaves:
+ print "All %d slaves granted" % opts.slaves
+ reservations = conn.get_all_instances(active_instance_ids)
+ slave_nodes = []
+ for r in reservations:
+ slave_nodes += r.instances
+ break
+ else:
+ print "%d of %d slaves granted, waiting longer" % (
+ len(active_instance_ids), opts.slaves)
+ except:
+ print "Canceling spot instance requests"
+ conn.cancel_spot_instance_requests(my_req_ids)
+ # Log a warning if any of these requests actually launched instances:
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name, die_on_error=False)
+ running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
+ if running:
+ print >> stderr, ("WARNING: %d instances are still running" % running)
+ sys.exit(0)
else:
# Launch non-spot instances
- slave_res = image.run(key_name = opts.key_pair,
- security_groups = [slave_group],
- instance_type = opts.instance_type,
- placement = opts.zone,
- min_count = opts.slaves,
- max_count = opts.slaves,
- block_device_map = block_map)
- slave_nodes = slave_res.instances
- print "Launched slaves, regid = " + slave_res.id
+ zones = get_zones(conn, opts)
+ num_zones = len(zones)
+ i = 0
+ slave_nodes = []
+ for zone in zones:
+ num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
+ slave_res = image.run(key_name = opts.key_pair,
+ security_groups = [slave_group],
+ instance_type = opts.instance_type,
+ placement = zone,
+ min_count = num_slaves_this_zone,
+ max_count = num_slaves_this_zone,
+ block_device_map = block_map)
+ slave_nodes += slave_res.instances
+ print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
+ zone, slave_res.id)
+ i += 1
# Launch masters
master_type = opts.master_instance_type
if master_type == "":
master_type = opts.instance_type
+ if opts.zone == 'all':
+ opts.zone = random.choice(conn.get_all_zones()).name
master_res = image.run(key_name = opts.key_pair,
security_groups = [master_group],
instance_type = master_type,
@@ -278,7 +309,7 @@ def launch_cluster(conn, opts, cluster_name):
max_count = 1,
block_device_map = block_map)
master_nodes = master_res.instances
- print "Launched master, regid = " + master_res.id
+ print "Launched master in %s, regid = %s" % (zone, master_res.id)
zoo_nodes = []
@@ -468,9 +499,30 @@ def ssh(host, opts, command):
(opts.identity_file, opts.user, host, command), shell=True)
+# Gets a list of zones to launch instances in
+def get_zones(conn, opts):
+ if opts.zone == 'all':
+ zones = [z.name for z in conn.get_all_zones()]
+ else:
+ zones = [opts.zone]
+ return zones
+
+
+# Gets the number of items in a partition
+def get_partition(total, num_partitions, current_partitions):
+ num_slaves_this_zone = total / num_partitions
+ if (total % num_partitions) - current_partitions > 0:
+ num_slaves_this_zone += 1
+ return num_slaves_this_zone
+
+
def main():
(opts, action, cluster_name) = parse_args()
- conn = boto.ec2.connect_to_region(opts.region)
+ try:
+ conn = ec2.connect_to_region(opts.region)
+ except Exception as e:
+ print >> stderr, (e)
+ sys.exit(1)
# Select an AZ at random if it was not specified.
if opts.zone == "":
@@ -503,6 +555,20 @@ def main():
print "Terminating zoo..."
for inst in zoo_nodes:
inst.terminate()
+ # Delete security groups as well
+ group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
+ groups = conn.get_all_security_groups()
+ for group in groups:
+ if group.name in group_names:
+ print "Deleting security group " + group.name
+ # Delete individual rules before deleting group to remove dependencies
+ for rule in group.rules:
+ for grant in rule.grants:
+ group.revoke(ip_protocol=rule.ip_protocol,
+ from_port=rule.from_port,
+ to_port=rule.to_port,
+ src_group=grant)
+ conn.delete_security_group(group.name)
elif action == "login":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala
index adce551322..6375961390 100644
--- a/examples/src/main/scala/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala
@@ -15,14 +15,13 @@ object SparkKMeans {
return new Vector(line.split(' ').map(_.toDouble))
}
- def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+ def closestPoint(p: Vector, centers: Array[Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
- for (i <- 1 to centers.size) {
- val vCurr = centers.get(i).get
- val tempDist = p.squaredDist(vCurr)
+ for (i <- 0 until centers.length) {
+ val tempDist = p.squaredDist(centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
@@ -43,32 +42,28 @@ object SparkKMeans {
val K = args(2).toInt
val convergeDist = args(3).toDouble
- var points = data.takeSample(false, K, 42)
- var kPoints = new HashMap[Int, Vector]
+ var kPoints = data.takeSample(false, K, 42).toArray
var tempDist = 1.0
-
- for (i <- 1 to points.size) {
- kPoints.put(i, points(i-1))
- }
while(tempDist > convergeDist) {
var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
- var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
+ var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
- var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collect()
+ var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
tempDist = 0.0
- for (pair <- newPoints) {
- tempDist += kPoints.get(pair._1).get.squaredDist(pair._2)
+ for (i <- 0 until K) {
+ tempDist += kPoints(i).squaredDist(newPoints(i))
}
for (newP <- newPoints) {
- kPoints.put(newP._1, newP._2)
+ kPoints(newP._1) = newP._2
}
}
- println("Final centers: " + kPoints)
+ println("Final centers:")
+ kPoints.foreach(println)
System.exit(0)
}
}
diff --git a/run2.cmd b/run2.cmd
index 097718b526..333d0506b0 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -63,5 +63,5 @@ if "%SPARK_LAUNCH_WITH_SCALA%" NEQ 1 goto java_runner
set EXTRA_ARGS=%JAVA_OPTS%
:run_spark
-%RUNNER% -cp "%CLASSPATH%" %EXTRA_ARGS% %*
+"%RUNNER%" -cp "%CLASSPATH%" %EXTRA_ARGS% %*
:exit
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index c76c73b35a..85106b3ad8 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -17,6 +17,26 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
+/**
+ * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
+ * for more details on RDDs). DStreams can either be created from live data (such as, data from
+ * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
+ * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
+ * by a parent DStream.
+ *
+ * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
+ * `window`. In addition, [[spark.streaming.PairDStreamFunctions]] contains operations available
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
+ * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
+ * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ *
+ * DStreams internally is characterized by a few basic properties:
+ * - A list of other DStreams that the DStream depends on
+ * - A time interval at which the DStream generates an RDD
+ * - A function that is used to generate an RDD after each time interval
+ */
case class DStreamCheckpointData(rdds: HashMap[Time, Any])
@@ -31,7 +51,7 @@ extends Serializable with Logging {
* ----------------------------------------------
*/
- // Time by which the window slides in this DStream
+ // Time interval at which the DStream generates an RDD
def slideTime: Time
// List of parent DStreams on which this DStream depends on
@@ -129,6 +149,8 @@ extends Serializable with Logging {
}
protected[streaming] def validate() {
+ assert(rememberDuration != null, "Remember duration is set to null")
+
assert(
!mustCheckpoint || checkpointInterval != null,
"The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " +
@@ -163,13 +185,25 @@ extends Serializable with Logging {
checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
)
+ val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
+ logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
+ assert(
+ metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
+ "It seems you are doing some DStream window operation or setting a checkpoint interval " +
+ "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
+ "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
+ "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
+ "the Java property 'spark.cleaner.delay' to more than " +
+ math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
+ )
+
dependencies.foreach(_.validate())
logInfo("Slide time = " + slideTime)
logInfo("Storage level = " + storageLevel)
logInfo("Checkpoint interval = " + checkpointInterval)
logInfo("Remember duration = " + rememberDuration)
- logInfo("Initialized " + this)
+ logInfo("Initialized and validated " + this)
}
protected[streaming] def setContext(s: StreamingContext) {
@@ -189,12 +223,12 @@ extends Serializable with Logging {
dependencies.foreach(_.setGraph(graph))
}
- protected[streaming] def setRememberDuration(duration: Time) {
+ protected[streaming] def remember(duration: Time) {
if (duration != null && duration > rememberDuration) {
rememberDuration = duration
logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
}
- dependencies.foreach(_.setRememberDuration(parentRememberDuration))
+ dependencies.foreach(_.remember(parentRememberDuration))
}
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
@@ -331,7 +365,8 @@ extends Serializable with Logging {
}
}
}
- logInfo("Updated checkpoint data for time " + currentTime)
+ logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, "
+ + "[" + checkpointData.mkString(",") + "]")
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index bd8c033eab..d0a9ade61d 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -22,7 +22,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
zeroTime = time
outputStreams.foreach(_.initialize(zeroTime))
- outputStreams.foreach(_.setRememberDuration(rememberDuration))
+ outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
inputStreams.par.foreach(_.start())
}
@@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
- private[streaming] def setRememberDuration(duration: Time) {
+ private[streaming] def remember(duration: Time) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 4c42692295..73ba877085 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -58,7 +58,7 @@ class NetworkInputTracker(
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
- logInfo("Registered receiver for network stream " + streamId)
+ logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
sender ! true
}
case AddBlocks(streamId, blockIds, metadata) => {
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index e09d27d34f..720e63bba0 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
+import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
@@ -115,7 +116,10 @@ extends Serializable {
slideTime: Time,
partitioner: Partitioner
): DStream[(K, V)] = {
- self.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner)
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ self.reduceByKey(cleanedReduceFunc, partitioner)
+ .window(windowTime, slideTime)
+ .reduceByKey(cleanedReduceFunc, partitioner)
}
// This method is the efficient sliding window reduce operation,
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 8b484e6acf..f63a9e0011 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -118,7 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
- throw new Exception("Neither previous window has value for key, nor new values found")
+ throw new Exception("Neither previous window has value for key, nor new values found. " +
+ "Are you sure your key class hashes consistently?")
}
// Reduce the new values
newValues.reduce(reduceF) // return
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index e2dca91179..014021be61 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -17,7 +17,7 @@ extends Logging {
val graph = ssc.graph
- val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt
+ val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) {
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 5e11e6d734..8153dd4567 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -17,20 +17,41 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
-
-final class StreamingContext (
+import spark.util.MetadataCleaner
+
+/**
+ * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
+ * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
+ * methods used to create DStream from various input sources.
+ */
+class StreamingContext private (
sc_ : SparkContext,
- cp_ : Checkpoint
+ cp_ : Checkpoint,
+ batchDur_ : Time
) extends Logging {
- def this(sparkContext: SparkContext) = this(sparkContext, null)
-
- def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) =
- this(new SparkContext(master, frameworkName, sparkHome, jars), null)
+ /**
+ * Creates a StreamingContext using an existing SparkContext.
+ * @param sparkContext Existing SparkContext
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration)
- def this(path: String) = this(null, CheckpointReader.read(path))
+ /**
+ * Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(master: String, frameworkName: String, batchDuration: Time) =
+ this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
- def this(cp_ : Checkpoint) = this(null, cp_)
+ /**
+ * Recreates the StreamingContext from a checkpoint file.
+ * @param path Path either to the directory that was specified as the checkpoint directory, or
+ * to the checkpoint file 'graph' or 'graph.bk'.
+ */
+ def this(path: String) = this(null, CheckpointReader.read(path), null)
initLogging()
@@ -57,7 +78,10 @@ final class StreamingContext (
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
- new DStreamGraph()
+ assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
+ val newGraph = new DStreamGraph()
+ newGraph.setBatchDuration(batchDur_)
+ newGraph
}
}
@@ -77,12 +101,8 @@ final class StreamingContext (
private[streaming] var receiverJobThread: Thread = null
private[streaming] var scheduler: Scheduler = null
- def setBatchDuration(duration: Time) {
- graph.setBatchDuration(duration)
- }
-
- def setRememberDuration(duration: Time) {
- graph.setRememberDuration(duration)
+ def remember(duration: Time) {
+ graph.remember(duration)
}
def checkpoint(dir: String, interval: Time = null) {
@@ -195,6 +215,10 @@ final class StreamingContext (
inputStream
}
+ def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
+ new UnionDStream[T](streams.toArray)
+ }
+
/**
* This function registers a InputDStream as an input stream that will be
* started (InputDStream.start() called) to get the input data streams.
@@ -220,11 +244,8 @@ final class StreamingContext (
"Checkpoint directory has been set, but the graph checkpointing interval has " +
"not been set. Please use StreamingContext.checkpoint() to set the interval."
)
-
-
}
-
/**
* This function starts the execution of the streams.
*/
@@ -271,6 +292,17 @@ final class StreamingContext (
object StreamingContext {
+
+ def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
+
+ // Set the default cleaner delay to an hour if not already set.
+ // This should be sufficient for even 1 second interval.
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ MetadataCleaner.setDelaySeconds(60)
+ }
+ new SparkContext(master, frameworkName)
+ }
+
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index ce89a3f99b..e4d2a634f5 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming
import spark.RDD
import spark.rdd.UnionRDD
+import spark.storage.StorageLevel
class WindowedDStream[T: ClassManifest](
@@ -18,6 +19,8 @@ class WindowedDStream[T: ClassManifest](
throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
+ parent.persist(StorageLevel.MEMORY_ONLY_SER)
+
def windowTime: Time = _windowTime
override def dependencies = List(parent)
diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
deleted file mode 100644
index d2fdabd659..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-import spark.streaming._
-import spark.streaming.StreamingContext._
-
-object CountRaw {
- def main(args: Array[String]) {
- if (args.length != 5) {
- System.err.println("Usage: CountRaw <master> <numStreams> <host> <port> <batchMillis>")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
-
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "CountRaw")
- ssc.setBatchDuration(Milliseconds(batchMillis))
-
- // Make sure some tasks have started on each node
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
-
- val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
- val union = new UnionDStream(rawStreams)
- union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString))
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
index d68611abd6..81938d30d4 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
@@ -14,10 +14,9 @@ object FileStream {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "FileStream")
- ssc.setBatchDuration(Seconds(2))
-
+ // Create the context
+ val ssc = new StreamingContext(args(0), "FileStream", Seconds(1))
+
// Create the new directory
val directory = new Path(args(1))
val fs = directory.getFileSystem(new Configuration())
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
index 21a83c0fde..b7bc15a1d5 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
@@ -32,9 +32,8 @@ object FileStreamWithCheckpoint {
if (!fs.exists(directory)) fs.mkdirs(directory)
// Create new streaming context
- val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint")
- ssc_.setBatchDuration(Seconds(1))
- ssc_.checkpoint(checkpointDir, Seconds(1))
+ val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1))
+ ssc_.checkpoint(checkpointDir)
// Setup the streaming computation
val inputStream = ssc_.textFileStream(directory.toString)
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index ffbea6e55d..6cb2b4c042 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -16,9 +16,10 @@ object GrepRaw {
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "GrepRaw")
- ssc.setBatchDuration(Milliseconds(batchMillis))
+ // Create the context
+ val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
warmUp(ssc.sc)
diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
index 2af51bad28..2a265d021d 100644
--- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
@@ -1,9 +1,8 @@
package spark.streaming.examples
import spark.RDD
-import spark.streaming.StreamingContext
+import spark.streaming.{Seconds, StreamingContext}
import spark.streaming.StreamingContext._
-import spark.streaming.Seconds
import scala.collection.mutable.SynchronizedQueue
@@ -15,10 +14,9 @@ object QueueStream {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "QueueStream")
- ssc.setBatchDuration(Seconds(1))
-
+ // Create the context
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
+
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
val rddQueue = new SynchronizedQueue[RDD[Int]]()
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index 0411bde1a7..fe4c2bf155 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -20,12 +20,11 @@ object TopKWordCountRaw {
val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
val k = 10
- // Create the context, set the batch size and checkpoint directory.
+ // Create the context, and set the checkpoint directory.
// Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
// periodically to HDFS
- val ssc = new StreamingContext(master, "TopKWordCountRaw")
- ssc.setBatchDuration(Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+ val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
// Warm up the JVMs on master and slave for JIT compilation to kick in
/*warmUp(ssc.sc)*/
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
index 591cb141c3..867a8f42c4 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
@@ -10,9 +10,8 @@ object WordCountHdfs {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountHdfs")
- ssc.setBatchDuration(Seconds(2))
+ // Create the context
+ val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
index ba1bd1de7c..eadda60563 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
@@ -6,13 +6,13 @@ import spark.streaming.StreamingContext._
object WordCountNetwork {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: WordCountNetwork <master> <hostname> <port>")
+ System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
// Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountNetwork")
- ssc.setBatchDuration(Seconds(2))
+ val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index 571428c0fe..a29c81d437 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -19,12 +19,11 @@ object WordCountRaw {
val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- // Create the context, set the batch size and checkpoint directory.
+ // Create the context, and set the checkpoint directory.
// Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
// periodically to HDFS
- val ssc = new StreamingContext(master, "WordCountRaw")
- ssc.setBatchDuration(Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+ val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
// Warm up the JVMs on master and slave for JIT compilation to kick in
warmUp(ssc.sc)
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 1a51fb66cd..68be6b7893 100644
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -23,9 +23,8 @@ object PageViewStream {
val host = args(1)
val port = args(2).toInt
- // Create the context and set the batch size
- val ssc = new StreamingContext("local[2]", "PageViewStream")
- ssc.setBatchDuration(Seconds(1))
+ // Create the context
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.networkTextStream(host, port)
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 33774b463d..02fe16866e 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,5 @@
# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
+log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index d0aaac0f2e..dc38ef4912 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -175,7 +175,7 @@ class BasicOperationsSuite extends TestSuiteBase {
}
val ssc = setupStreams(input, operation _)
- ssc.setRememberDuration(rememberDuration)
+ ssc.remember(rememberDuration)
runStreams[(Int, Int)](ssc, input.size, input.size / 2)
val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 3e99440226..e98c096725 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -40,8 +40,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
@@ -89,8 +88,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ var ssc = new StreamingContext(master, framework, batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
@@ -137,8 +135,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
val filestream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
@@ -198,8 +195,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ var ssc = new StreamingContext(master, framework, batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
val filestream = ssc.textFileStream(testDir.toString)
var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 5fb5cc504c..8cc2f8ccfc 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -76,8 +76,7 @@ trait TestSuiteBase extends FunSuite with Logging {
): StreamingContext = {
// Create StreamingContext
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir, checkpointInterval)
}
@@ -98,8 +97,7 @@ trait TestSuiteBase extends FunSuite with Logging {
): StreamingContext = {
// Create StreamingContext
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir, checkpointInterval)
}