aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala1
-rw-r--r--core/src/main/scala/spark/Utils.scala8
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala6
-rw-r--r--core/src/main/scala/spark/deploy/WebUI.scala30
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala14
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala23
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala96
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterArguments.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerArguments.scala4
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala15
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala6
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala2
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala91
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala7
-rw-r--r--core/src/main/scala/spark/util/Vector.scala3
-rw-r--r--core/src/main/twirl/spark/deploy/master/index.scala.html15
-rw-r--r--core/src/main/twirl/spark/deploy/master/job_row.scala.html18
-rw-r--r--core/src/main/twirl/spark/deploy/master/job_table.scala.html9
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_row.scala.html6
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_table.scala.html4
-rw-r--r--core/src/main/twirl/spark/deploy/worker/executor_row.scala.html8
-rw-r--r--core/src/main/twirl/spark/deploy/worker/index.scala.html6
-rw-r--r--docs/quick-start.md18
-rw-r--r--docs/running-on-mesos.md2
-rw-r--r--docs/scala-programming-guide.md2
-rwxr-xr-xec2/spark_ec2.py194
-rw-r--r--examples/src/main/scala/spark/examples/SparkKMeans.scala27
-rw-r--r--run2.cmd2
-rw-r--r--streaming/src/test/resources/log4j.properties2
34 files changed, 467 insertions, 169 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 4c6ec6cc6e..9f2b0c42c7 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -68,7 +68,6 @@ object SparkEnv extends Logging {
isMaster: Boolean,
isLocal: Boolean
) : SparkEnv = {
-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 1bdde25896..06fa559fb6 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -199,7 +199,13 @@ private object Utils extends Logging {
/**
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
*/
- def localIpAddress(): String = InetAddress.getLocalHost.getHostAddress
+ def localIpAddress(): String = {
+ val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
+ if (defaultIpOverride != null)
+ defaultIpOverride
+ else
+ InetAddress.getLocalHost.getHostAddress
+ }
private var customHostname: Option[String] = None
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index d2b63d6e0d..7a1089c816 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -67,8 +67,8 @@ private[spark] case object RequestMasterState
// Master to MasterWebUI
private[spark]
-case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
- completedJobs: List[JobInfo])
+case class MasterState(uri: String, workers: Array[WorkerInfo], activeJobs: Array[JobInfo],
+ completedJobs: Array[JobInfo])
// WorkerWebUI to Worker
private[spark] case object RequestWorkerState
@@ -78,4 +78,4 @@ private[spark] case object RequestWorkerState
private[spark]
case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
- coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file
+ coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)
diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala
new file mode 100644
index 0000000000..ad1a1092b2
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/WebUI.scala
@@ -0,0 +1,30 @@
+package spark.deploy
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+/**
+ * Utilities used throughout the web UI.
+ */
+private[spark] object WebUI {
+ val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+
+ def formatDate(date: Date): String = DATE_FORMAT.format(date)
+
+ def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
+
+ def formatDuration(milliseconds: Long): String = {
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return "%.0f s".format(seconds)
+ }
+ val minutes = seconds / 60
+ if (minutes < 10) {
+ return "%.1f min".format(minutes)
+ } else if (minutes < 60) {
+ return "%.0f min".format(minutes)
+ }
+ val hours = minutes / 60
+ return "%.1f h".format(hours)
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index e51b0c5c15..c57a1d33e9 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -35,6 +35,7 @@ private[spark] class Client(
class ClientActor extends Actor with Logging {
var master: ActorRef = null
+ var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
override def preStart() {
@@ -43,6 +44,7 @@ private[spark] class Client(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
+ masterAddress = master.path.address
master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
@@ -72,7 +74,17 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""))
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case Terminated(actor_) if actor_ == master =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientShutdown(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 8795c09cc1..130b031a2a 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -5,11 +5,17 @@ import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
-private[spark]
-class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
+private[spark] class JobInfo(
+ val startTime: Long,
+ val id: String,
+ val desc: JobDescription,
+ val submitDate: Date,
+ val actor: ActorRef)
+{
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
+ var endTime = -1L
private var nextExecutorId = 0
@@ -41,4 +47,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
_retryCount += 1
_retryCount
}
+
+ def markFinished(endState: JobState.Value) {
+ state = endState
+ endTime = System.currentTimeMillis()
+ }
+
+ def duration: Long = {
+ if (endTime != -1) {
+ endTime - startTime
+ } else {
+ System.currentTimeMillis() - startTime
+ }
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 6010f7cff2..7e5cd6b171 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
+ // As a temporary workaround before better ways of configuring memory, we allow users to set
+ // a flag that will perform round-robin scheduling across the nodes (spreading out each job
+ // among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
+ val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean
+
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -123,28 +128,62 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
case RequestMasterState => {
- sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
+ sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray)
}
}
/**
+ * Can a job use the given worker? True if the worker has enough memory and we haven't already
+ * launched an executor for the job on it (right now the standalone backend doesn't like having
+ * two executors on the same worker).
+ */
+ def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
+ worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
+ }
+
+ /**
* Schedule the currently available resources among waiting jobs. This method will be called
* every time a new job joins or resource availability changes.
*/
def schedule() {
- // Right now this is a very simple FIFO scheduler. We keep looking through the jobs
- // in order of submission time and launching the first one that fits on each node.
- for (worker <- workers if worker.coresFree > 0) {
- for (job <- waitingJobs.clone()) {
- val jobMemory = job.desc.memoryPerSlave
- if (worker.memoryFree >= jobMemory) {
- val coresToUse = math.min(worker.coresFree, job.coresLeft)
- val exec = job.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec)
+ // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
+ // in the queue, then the second job, etc.
+ if (spreadOutJobs) {
+ // Try to spread out each job among all the nodes, until it has all its cores
+ for (job <- waitingJobs if job.coresLeft > 0) {
+ val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
+ val numUsable = usableWorkers.length
+ val assigned = new Array[Int](numUsable) // Number of cores to give on each node
+ var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
+ var pos = 0
+ while (toAssign > 0) {
+ if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
+ toAssign -= 1
+ assigned(pos) += 1
+ }
+ pos = (pos + 1) % numUsable
}
- if (job.coresLeft == 0) {
- waitingJobs -= job
- job.state = JobState.RUNNING
+ // Now that we've decided how many cores to give on each node, let's actually give them
+ for (pos <- 0 until numUsable) {
+ if (assigned(pos) > 0) {
+ val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
+ launchExecutor(usableWorkers(pos), exec)
+ job.state = JobState.RUNNING
+ }
+ }
+ }
+ } else {
+ // Pack each job into as few nodes as possible until we've assigned all its cores
+ for (worker <- workers if worker.coresFree > 0) {
+ for (job <- waitingJobs if job.coresLeft > 0) {
+ if (canUse(job, worker)) {
+ val coresToUse = math.min(worker.coresFree, job.coresLeft)
+ if (coresToUse > 0) {
+ val exec = job.addExecutor(worker, coresToUse)
+ launchExecutor(worker, exec)
+ job.state = JobState.RUNNING
+ }
+ }
}
}
}
@@ -179,8 +218,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
- val date = new Date
- val job = new JobInfo(newJobId(date), desc, date, actor)
+ val now = System.currentTimeMillis()
+ val date = new Date(now)
+ val job = new JobInfo(now, newJobId(date), desc, date, actor)
jobs += job
idToJob(job.id) = job
actorToJob(sender) = job
@@ -189,19 +229,21 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
def removeJob(job: JobInfo) {
- logInfo("Removing job " + job.id)
- jobs -= job
- idToJob -= job.id
- actorToJob -= job.actor
- addressToWorker -= job.actor.path.address
- completedJobs += job // Remember it in our history
- waitingJobs -= job
- for (exec <- job.executors.values) {
- exec.worker.removeExecutor(exec)
- exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
+ if (jobs.contains(job)) {
+ logInfo("Removing job " + job.id)
+ jobs -= job
+ idToJob -= job.id
+ actorToJob -= job.actor
+ addressToWorker -= job.actor.path.address
+ completedJobs += job // Remember it in our history
+ waitingJobs -= job
+ for (exec <- job.executors.values) {
+ exec.worker.removeExecutor(exec)
+ exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
+ }
+ job.markFinished(JobState.FINISHED) // TODO: Mark it as FAILED if it failed
+ schedule()
}
- job.state = JobState.FINISHED
- schedule()
}
/** Generate a new job ID given a job's submission date */
diff --git a/core/src/main/scala/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
index 1b1c3dd0ad..4ceab3fc03 100644
--- a/core/src/main/scala/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
@@ -7,7 +7,7 @@ import spark.Utils
* Command-line parser for the master.
*/
private[spark] class MasterArguments(args: Array[String]) {
- var ip = Utils.localIpAddress()
+ var ip = Utils.localHostName()
var port = 7077
var webUiPort = 8080
@@ -59,4 +59,4 @@ private[spark] class MasterArguments(args: Array[String]) {
" --webui-port PORT Port for web UI (default: 8080)")
System.exit(exitCode)
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 700a41c770..3cdd3721f5 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -36,7 +36,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
// A bit ugly an inefficient, but we won't have a number of jobs
// so large that it will make a significant difference.
- (masterState.activeJobs ::: masterState.completedJobs).find(_.id == jobId) match {
+ (masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match {
case Some(job) => spark.deploy.master.html.job_details.render(job)
case _ => null
}
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 16b3f9b653..706b1453aa 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -33,6 +33,10 @@ private[spark] class WorkerInfo(
memoryUsed -= exec.memory
}
}
+
+ def hasExecutor(job: JobInfo): Boolean = {
+ executors.values.exists(_.job == job)
+ }
def webUiAddress : String = {
"http://" + this.host + ":" + this.webUiPort
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 474c9364fd..67d41dda29 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -123,7 +123,7 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
- master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
+ master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None)
case ExecutorStateChanged(jobId, execId, state, message) =>
master ! ExecutorStateChanged(jobId, execId, state, message)
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
index 60dc107a4c..340920025b 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
@@ -9,7 +9,7 @@ import java.lang.management.ManagementFactory
* Command-line parser for the master.
*/
private[spark] class WorkerArguments(args: Array[String]) {
- var ip = Utils.localIpAddress()
+ var ip = Utils.localHostName()
var port = 0
var webUiPort = 8081
var cores = inferDefaultCores()
@@ -110,4 +110,4 @@ private[spark] class WorkerArguments(args: Array[String]) {
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, 512)
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index dfdb22024e..cb29a6b8b4 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -43,6 +43,21 @@ private[spark] class Executor extends Logging {
urlClassLoader = createClassLoader()
Thread.currentThread.setContextClassLoader(urlClassLoader)
+ // Make any thread terminations due to uncaught exceptions kill the entire
+ // executor process to avoid surprising stalls.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler {
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+ System.exit(1)
+ } catch {
+ case t: Throwable => System.exit(2)
+ }
+ }
+ }
+ )
+
// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
SparkEnv.set(env)
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index da39108164..642fa4b525 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -304,7 +304,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
connectionRequests += newConnection
newConnection
}
- val connection = connectionsById.getOrElse(connectionManagerId, startNewConnection())
+ val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
+ val connection = connectionsById.getOrElse(lookupKey, startNewConnection())
message.senderAddress = id.toSocketAddress()
logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
/*connection.send(message)*/
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index cdfe1f2563..814443fa52 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -272,7 +272,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
- scheduler.slaveLost(slaveId.toString)
+ scheduler.slaveLost(slaveId.getValue)
}
override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 4d5ee8318c..397395a65b 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -243,6 +243,12 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ // Can happen if this is from a locally cached partition on the master
+ sender ! true
+ return
+ }
+
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in updateBlockInfo 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 09769d1f7d..a222b2f3df 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -40,7 +40,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true)
} else {
- val entry = new Entry(bytes, bytes.limit, false)
tryToPut(blockId, bytes, bytes.limit, false)
}
}
@@ -175,6 +174,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Otherwise, the freed space may fill up before the caller puts in their new value.
*/
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
+
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
new file mode 100644
index 0000000000..e4a5b8ffdf
--- /dev/null
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -0,0 +1,91 @@
+package spark.storage
+
+import akka.actor._
+
+import spark.KryoSerializer
+import java.util.concurrent.ArrayBlockingQueue
+import util.Random
+
+/**
+ * This class tests the BlockManager and MemoryStore for thread safety and
+ * deadlocks. It spawns a number of producer and consumer threads. Producer
+ * threads continuously pushes blocks into the BlockManager and consumer
+ * threads continuously retrieves the blocks form the BlockManager and tests
+ * whether the block is correct or not.
+ */
+private[spark] object ThreadingTest {
+
+ val numProducers = 5
+ val numBlocksPerProducer = 20000
+
+ private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
+ val queue = new ArrayBlockingQueue[(String, Seq[Int])](100)
+
+ override def run() {
+ for (i <- 1 to numBlocksPerProducer) {
+ val blockId = "b-" + id + "-" + i
+ val blockSize = Random.nextInt(1000)
+ val block = (1 to blockSize).map(_ => Random.nextInt())
+ val level = randomLevel()
+ val startTime = System.currentTimeMillis()
+ manager.put(blockId, block.iterator, level, true)
+ println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ queue.add((blockId, block))
+ }
+ println("Producer thread " + id + " terminated")
+ }
+
+ def randomLevel(): StorageLevel = {
+ math.abs(Random.nextInt()) % 4 match {
+ case 0 => StorageLevel.MEMORY_ONLY
+ case 1 => StorageLevel.MEMORY_ONLY_SER
+ case 2 => StorageLevel.MEMORY_AND_DISK
+ case 3 => StorageLevel.MEMORY_AND_DISK_SER
+ }
+ }
+ }
+
+ private[spark] class ConsumerThread(
+ manager: BlockManager,
+ queue: ArrayBlockingQueue[(String, Seq[Int])]
+ ) extends Thread {
+ var numBlockConsumed = 0
+
+ override def run() {
+ println("Consumer thread started")
+ while(numBlockConsumed < numBlocksPerProducer) {
+ val (blockId, block) = queue.take()
+ val startTime = System.currentTimeMillis()
+ manager.get(blockId) match {
+ case Some(retrievedBlock) =>
+ assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
+ println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ case None =>
+ assert(false, "Block " + blockId + " could not be retrieved")
+ }
+ numBlockConsumed += 1
+ }
+ println("Consumer thread terminated")
+ }
+ }
+
+ def main(args: Array[String]) {
+ System.setProperty("spark.kryoserializer.buffer.mb", "1")
+ val actorSystem = ActorSystem("test")
+ val serializer = new KryoSerializer
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
+ val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
+ val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
+ val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
+ producers.foreach(_.start)
+ consumers.foreach(_.start)
+ producers.foreach(_.join)
+ consumers.foreach(_.join)
+ blockManager.stop()
+ blockManagerMaster.stop()
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ println("Everything stopped.")
+ println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
+ }
+}
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index b466b5239c..e67cb0336d 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -25,6 +25,8 @@ private[spark] object AkkaUtils {
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
+ val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
+ val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
@@ -32,10 +34,11 @@ private[spark] object AkkaUtils {
akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d
- akka.remote.netty.connection-timeout = 1s
+ akka.remote.netty.connection-timeout = %ds
+ akka.remote.netty.message-frame-size = %d MiB
akka.remote.netty.execution-pool-size = %d
akka.actor.default-dispatcher.throughput = %d
- """.format(host, port, akkaThreads, akkaBatchSize))
+ """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
index 4e95ac2ac6..03559751bc 100644
--- a/core/src/main/scala/spark/util/Vector.scala
+++ b/core/src/main/scala/spark/util/Vector.scala
@@ -49,7 +49,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
return ans
}
- def +=(other: Vector) {
+ def += (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
var ans = 0.0
@@ -58,6 +58,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
elements(i) += other(i)
i += 1
}
+ this
}
def * (scale: Double): Vector = Vector(length, i => this(i) * scale)
diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html
index 7562076b00..18c32e5a1f 100644
--- a/core/src/main/twirl/spark/deploy/master/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/index.scala.html
@@ -1,5 +1,6 @@
@(state: spark.deploy.MasterState)
@import spark.deploy.master._
+@import spark.Utils
@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) {
@@ -8,9 +9,11 @@
<div class="span12">
<ul class="unstyled">
<li><strong>URL:</strong> spark://@(state.uri)</li>
- <li><strong>Number of Workers:</strong> @state.workers.size </li>
- <li><strong>Cores:</strong> @state.workers.map(_.cores).sum Total, @state.workers.map(_.coresUsed).sum Used</li>
- <li><strong>Memory:</strong> @state.workers.map(_.memory).sum Total, @state.workers.map(_.memoryUsed).sum Used</li>
+ <li><strong>Workers:</strong> @state.workers.size </li>
+ <li><strong>Cores:</strong> @{state.workers.map(_.cores).sum} Total,
+ @{state.workers.map(_.coresUsed).sum} Used</li>
+ <li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
+ @{Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
<li><strong>Jobs:</strong> @state.activeJobs.size Running, @state.completedJobs.size Completed </li>
</ul>
</div>
@@ -21,7 +24,7 @@
<div class="span12">
<h3> Cluster Summary </h3>
<br/>
- @worker_table(state.workers)
+ @worker_table(state.workers.sortBy(_.id))
</div>
</div>
@@ -32,7 +35,7 @@
<div class="span12">
<h3> Running Jobs </h3>
<br/>
- @job_table(state.activeJobs)
+ @job_table(state.activeJobs.sortBy(_.startTime).reverse)
</div>
</div>
@@ -43,7 +46,7 @@
<div class="span12">
<h3> Completed Jobs </h3>
<br/>
- @job_table(state.completedJobs)
+ @job_table(state.completedJobs.sortBy(_.endTime).reverse)
</div>
</div>
diff --git a/core/src/main/twirl/spark/deploy/master/job_row.scala.html b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
index 7c4865bb6e..7c466a6a2c 100644
--- a/core/src/main/twirl/spark/deploy/master/job_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
@@ -1,20 +1,20 @@
@(job: spark.deploy.master.JobInfo)
+@import spark.Utils
+@import spark.deploy.WebUI.formatDate
+@import spark.deploy.WebUI.formatDuration
+
<tr>
<td>
<a href="job?jobId=@(job.id)">@job.id</a>
</td>
<td>@job.desc.name</td>
<td>
- @job.coresGranted Granted
- @if(job.desc.cores == Integer.MAX_VALUE) {
-
- } else {
- , @job.coresLeft
- }
+ @job.coresGranted
</td>
- <td>@job.desc.memoryPerSlave</td>
- <td>@job.submitDate</td>
+ <td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
+ <td>@formatDate(job.submitDate)</td>
<td>@job.desc.user</td>
<td>@job.state.toString()</td>
-</tr> \ No newline at end of file
+ <td>@formatDuration(job.duration)</td>
+</tr>
diff --git a/core/src/main/twirl/spark/deploy/master/job_table.scala.html b/core/src/main/twirl/spark/deploy/master/job_table.scala.html
index 52bad6c4b8..d267d6e85e 100644
--- a/core/src/main/twirl/spark/deploy/master/job_table.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_table.scala.html
@@ -1,4 +1,4 @@
-@(jobs: List[spark.deploy.master.JobInfo])
+@(jobs: Array[spark.deploy.master.JobInfo])
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
@@ -6,10 +6,11 @@
<th>JobID</th>
<th>Description</th>
<th>Cores</th>
- <th>Memory per Slave</th>
- <th>Submit Date</th>
+ <th>Memory per Node</th>
+ <th>Submit Time</th>
<th>User</th>
<th>State</th>
+ <th>Duration</th>
</tr>
</thead>
<tbody>
@@ -17,4 +18,4 @@
@job_row(j)
}
</tbody>
-</table> \ No newline at end of file
+</table>
diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
index 017cc4859e..3dcba3a545 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
@@ -1,11 +1,13 @@
@(worker: spark.deploy.master.WorkerInfo)
+@import spark.Utils
+
<tr>
<td>
<a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
</td>
<td>@{worker.host}:@{worker.port}</td>
<td>@worker.cores (@worker.coresUsed Used)</td>
- <td>@{spark.Utils.memoryMegabytesToString(worker.memory)}
- (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
+ <td>@{Utils.memoryMegabytesToString(worker.memory)}
+ (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
</tr>
diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
index 2028842297..fad1af41dc 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
@@ -1,4 +1,4 @@
-@(workers: List[spark.deploy.master.WorkerInfo])
+@(workers: Array[spark.deploy.master.WorkerInfo])
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
@@ -14,4 +14,4 @@
@worker_row(w)
}
</tbody>
-</table> \ No newline at end of file
+</table>
diff --git a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
index c3842dbf85..ea9542461e 100644
--- a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
@@ -1,20 +1,20 @@
@(executor: spark.deploy.worker.ExecutorRunner)
+@import spark.Utils
+
<tr>
<td>@executor.execId</td>
<td>@executor.cores</td>
- <td>@executor.memory</td>
+ <td>@Utils.memoryMegabytesToString(executor.memory)</td>
<td>
<ul class="unstyled">
<li><strong>ID:</strong> @executor.jobId</li>
<li><strong>Name:</strong> @executor.jobDesc.name</li>
<li><strong>User:</strong> @executor.jobDesc.user</li>
- <li><strong>Cores:</strong> @executor.jobDesc.cores </li>
- <li><strong>Memory per Slave:</strong> @executor.jobDesc.memoryPerSlave</li>
</ul>
</td>
<td>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stdout">stdout</a>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stderr">stderr</a>
</td>
-</tr> \ No newline at end of file
+</tr>
diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html
index 69746ed02c..b247307dab 100644
--- a/core/src/main/twirl/spark/deploy/worker/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html
@@ -1,5 +1,7 @@
@(worker: spark.deploy.WorkerState)
+@import spark.Utils
+
@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) {
<!-- Worker Details -->
@@ -12,8 +14,8 @@
(WebUI at <a href="@worker.masterWebUiUrl">@worker.masterWebUiUrl</a>)
</li>
<li><strong>Cores:</strong> @worker.cores (@worker.coresUsed Used)</li>
- <li><strong>Memory:</strong> @{spark.Utils.memoryMegabytesToString(worker.memory)}
- (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
+ <li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(worker.memory)}
+ (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
</ul>
</div>
</div>
diff --git a/docs/quick-start.md b/docs/quick-start.md
index defdb34836..dbc232b6e0 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -6,7 +6,7 @@ title: Quick Start
* This will become a table of contents (this text will be scraped).
{:toc}
-This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will need much for this), then show how to write standalone jobs in Scala and Java. See the [programming guide](scala-programming-guide.html) for a fuller reference.
+This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will not need much for this), then show how to write standalone jobs in Scala and Java. See the [programming guide](scala-programming-guide.html) for a more complete reference.
To follow along with this guide, you only need to have successfully built Spark on one machine. Simply go into your Spark directory and run:
@@ -60,7 +60,7 @@ scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a
res4: Long = 16
{% endhighlight %}
-This first maps a line to an integer value, creating a new RDD. `reduce` is called on that RDD to find the largest line count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand:
+This first maps a line to an integer value, creating a new RDD. `reduce` is called on that RDD to find the largest line count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand:
{% highlight scala %}
scala> import java.lang.Math
@@ -98,10 +98,10 @@ scala> linesWithSpark.count()
res9: Long = 15
{% endhighlight %}
-It may seem silly to use a Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
+It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
# A Standalone Job in Scala
-Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, consider using the Spark assembly JAR described in the developer guide.
+Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.
We'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.scala`:
@@ -112,7 +112,7 @@ import SparkContext._
object SimpleJob extends Application {
val logFile = "/var/log/syslog" // Should be some file on your system
- val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
+ val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
"target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
@@ -139,10 +139,10 @@ resolvers ++= Seq(
"Spray Repository" at "http://repo.spray.cc/")
{% endhighlight %}
-Of course, for sbt to work correctly, we'll need to layout `SimpleJob.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the job's code, then use `sbt run` to execute our example job.
+Of course, for sbt to work correctly, we'll need to layout `SimpleJob.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the job's code, then use `sbt run` to execute our example job.
{% highlight bash %}
-$ find .
+$ find .
.
./simple.sbt
./src
@@ -159,7 +159,7 @@ Lines with a: 8422, Lines with b: 1836
This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
# A Standalone Job In Java
-Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you using other build systems, consider using the Spark assembly JAR described in the developer guide.
+Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.
We'll create a very simple Spark job, `SimpleJob.java`:
@@ -171,7 +171,7 @@ import spark.api.java.function.Function;
public class SimpleJob {
public static void main(String[] args) {
String logFile = "/var/log/syslog"; // Should be some file on your system
- JavaSparkContext sc = new JavaSparkContext("local", "Simple Job",
+ JavaSparkContext sc = new JavaSparkContext("local", "Simple Job",
"$YOUR_SPARK_HOME", "target/simple-project-1.0.jar");
JavaRDD<String> logData = sc.textFile(logFile).cache();
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 97564d7426..f4a3eb667c 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -15,7 +15,7 @@ Spark can run on private clusters managed by the [Apache Mesos](http://incubator
6. Copy Spark and Mesos to the _same_ paths on all the nodes in the cluster (or, for Mesos, `make install` on every node).
7. Configure Mesos for deployment:
* On your master node, edit `<prefix>/var/mesos/deploy/masters` to list your master and `<prefix>/var/mesos/deploy/slaves` to list the slaves, where `<prefix>` is the prefix where you installed Mesos (`/usr/local` by default).
- * On all nodes, edit `<prefix>/var/mesos/deploy/mesos.conf` and add the line `master=HOST:5050`, where HOST is your master node.
+ * On all nodes, edit `<prefix>/var/mesos/conf/mesos.conf` and add the line `master=HOST:5050`, where HOST is your master node.
* Run `<prefix>/sbin/mesos-start-cluster.sh` on your master to start Mesos. If all goes well, you should see Mesos's web UI on port 8080 of the master machine.
* See Mesos's README file for more information on deploying it.
8. To run a Spark job against the cluster, when you create your `SparkContext`, pass the string `mesos://HOST:5050` as the first parameter, where `HOST` is the machine running your Mesos master. In addition, pass the location of Spark on your nodes as the third parameter, and a list of JAR files containing your JAR's code as the fourth (these will automatically get copied to the workers). For example:
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 73f8b123be..7350eca837 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -19,7 +19,7 @@ This guide shows each of these features and walks through some samples. It assum
To write a Spark application, you will need to add both Spark and its dependencies to your CLASSPATH. If you use sbt or Maven, Spark is available through Maven Central at:
- groupId = org.spark_project
+ groupId = org.spark-project
artifactId = spark-core_{{site.SCALA_VERSION}}
version = {{site.SPARK_VERSION}}
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 6a3647b218..2ab11dbd34 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -19,7 +19,6 @@
from __future__ import with_statement
-import boto
import logging
import os
import random
@@ -32,7 +31,7 @@ import urllib2
from optparse import OptionParser
from sys import stderr
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
-
+from boto import ec2
# A static URL from which to figure out the latest Mesos EC2 AMI
LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.6"
@@ -61,7 +60,9 @@ def parse_args():
parser.add_option("-r", "--region", default="us-east-1",
help="EC2 region zone to launch instances in")
parser.add_option("-z", "--zone", default="",
- help="Availability zone to launch instances in")
+ help="Availability zone to launch instances in, or 'all' to spread " +
+ "slaves across multiple (an additional $0.01/Gb for bandwidth" +
+ "between zones applies)")
parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " +
"available AMI (default: latest)")
@@ -97,14 +98,20 @@ def parse_args():
if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
sys.exit(1)
- if os.getenv('AWS_ACCESS_KEY_ID') == None:
- print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
- "must be set")
- sys.exit(1)
- if os.getenv('AWS_SECRET_ACCESS_KEY') == None:
- print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
- "must be set")
- sys.exit(1)
+
+ # Boto config check
+ # http://boto.cloudhackers.com/en/latest/boto_config_tut.html
+ home_dir = os.getenv('HOME')
+ if home_dir == None or not os.path.isfile(home_dir + '/.boto'):
+ if not os.path.isfile('/etc/boto.cfg'):
+ if os.getenv('AWS_ACCESS_KEY_ID') == None:
+ print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
+ "must be set")
+ sys.exit(1)
+ if os.getenv('AWS_SECRET_ACCESS_KEY') == None:
+ print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
+ "must be set")
+ sys.exit(1)
return (opts, action, cluster_name)
@@ -180,16 +187,12 @@ def launch_cluster(conn, opts, cluster_name):
zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
# Check if instances are already running in our groups
- print "Checking for running cluster..."
- reservations = conn.get_all_instances()
- for res in reservations:
- group_names = [g.id for g in res.groups]
- if master_group.name in group_names or slave_group.name in group_names or zoo_group.name in group_names:
- active = [i for i in res.instances if is_active(i)]
- if len(active) > 0:
- print >> stderr, ("ERROR: There are already instances running in " +
- "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
- sys.exit(1)
+ active_nodes = get_existing_cluster(conn, opts, cluster_name,
+ die_on_error=False)
+ if any(active_nodes):
+ print >> stderr, ("ERROR: There are already instances running in " +
+ "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
+ sys.exit(1)
# Figure out the latest AMI from our static URL
if opts.ami == "latest":
@@ -221,55 +224,83 @@ def launch_cluster(conn, opts, cluster_name):
# Launch spot instances with the requested price
print ("Requesting %d slaves as spot instances with price $%.3f" %
(opts.slaves, opts.spot_price))
- slave_reqs = conn.request_spot_instances(
- price = opts.spot_price,
- image_id = opts.ami,
- launch_group = "launch-group-%s" % cluster_name,
- placement = opts.zone,
- count = opts.slaves,
- key_name = opts.key_pair,
- security_groups = [slave_group],
- instance_type = opts.instance_type,
- block_device_map = block_map)
- my_req_ids = [req.id for req in slave_reqs]
+ zones = get_zones(conn, opts)
+ num_zones = len(zones)
+ i = 0
+ my_req_ids = []
+ for zone in zones:
+ num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
+ slave_reqs = conn.request_spot_instances(
+ price = opts.spot_price,
+ image_id = opts.ami,
+ launch_group = "launch-group-%s" % cluster_name,
+ placement = zone,
+ count = num_slaves_this_zone,
+ key_name = opts.key_pair,
+ security_groups = [slave_group],
+ instance_type = opts.instance_type,
+ block_device_map = block_map)
+ my_req_ids += [req.id for req in slave_reqs]
+ i += 1
+
print "Waiting for spot instances to be granted..."
- while True:
- time.sleep(10)
- reqs = conn.get_all_spot_instance_requests()
- id_to_req = {}
- for r in reqs:
- id_to_req[r.id] = r
- active = 0
- instance_ids = []
- for i in my_req_ids:
- if id_to_req[i].state == "active":
- active += 1
- instance_ids.append(id_to_req[i].instance_id)
- if active == opts.slaves:
- print "All %d slaves granted" % opts.slaves
- reservations = conn.get_all_instances(instance_ids)
- slave_nodes = []
- for r in reservations:
- slave_nodes += r.instances
- break
- else:
- print "%d of %d slaves granted, waiting longer" % (active, opts.slaves)
+ try:
+ while True:
+ time.sleep(10)
+ reqs = conn.get_all_spot_instance_requests()
+ id_to_req = {}
+ for r in reqs:
+ id_to_req[r.id] = r
+ active_instance_ids = []
+ for i in my_req_ids:
+ if i in id_to_req and id_to_req[i].state == "active":
+ active_instance_ids.append(id_to_req[i].instance_id)
+ if len(active_instance_ids) == opts.slaves:
+ print "All %d slaves granted" % opts.slaves
+ reservations = conn.get_all_instances(active_instance_ids)
+ slave_nodes = []
+ for r in reservations:
+ slave_nodes += r.instances
+ break
+ else:
+ print "%d of %d slaves granted, waiting longer" % (
+ len(active_instance_ids), opts.slaves)
+ except:
+ print "Canceling spot instance requests"
+ conn.cancel_spot_instance_requests(my_req_ids)
+ # Log a warning if any of these requests actually launched instances:
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name, die_on_error=False)
+ running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
+ if running:
+ print >> stderr, ("WARNING: %d instances are still running" % running)
+ sys.exit(0)
else:
# Launch non-spot instances
- slave_res = image.run(key_name = opts.key_pair,
- security_groups = [slave_group],
- instance_type = opts.instance_type,
- placement = opts.zone,
- min_count = opts.slaves,
- max_count = opts.slaves,
- block_device_map = block_map)
- slave_nodes = slave_res.instances
- print "Launched slaves, regid = " + slave_res.id
+ zones = get_zones(conn, opts)
+ num_zones = len(zones)
+ i = 0
+ slave_nodes = []
+ for zone in zones:
+ num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
+ slave_res = image.run(key_name = opts.key_pair,
+ security_groups = [slave_group],
+ instance_type = opts.instance_type,
+ placement = zone,
+ min_count = num_slaves_this_zone,
+ max_count = num_slaves_this_zone,
+ block_device_map = block_map)
+ slave_nodes += slave_res.instances
+ print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
+ zone, slave_res.id)
+ i += 1
# Launch masters
master_type = opts.master_instance_type
if master_type == "":
master_type = opts.instance_type
+ if opts.zone == 'all':
+ opts.zone = random.choice(conn.get_all_zones()).name
master_res = image.run(key_name = opts.key_pair,
security_groups = [master_group],
instance_type = master_type,
@@ -278,7 +309,7 @@ def launch_cluster(conn, opts, cluster_name):
max_count = 1,
block_device_map = block_map)
master_nodes = master_res.instances
- print "Launched master, regid = " + master_res.id
+ print "Launched master in %s, regid = %s" % (zone, master_res.id)
zoo_nodes = []
@@ -468,9 +499,30 @@ def ssh(host, opts, command):
(opts.identity_file, opts.user, host, command), shell=True)
+# Gets a list of zones to launch instances in
+def get_zones(conn, opts):
+ if opts.zone == 'all':
+ zones = [z.name for z in conn.get_all_zones()]
+ else:
+ zones = [opts.zone]
+ return zones
+
+
+# Gets the number of items in a partition
+def get_partition(total, num_partitions, current_partitions):
+ num_slaves_this_zone = total / num_partitions
+ if (total % num_partitions) - current_partitions > 0:
+ num_slaves_this_zone += 1
+ return num_slaves_this_zone
+
+
def main():
(opts, action, cluster_name) = parse_args()
- conn = boto.ec2.connect_to_region(opts.region)
+ try:
+ conn = ec2.connect_to_region(opts.region)
+ except Exception as e:
+ print >> stderr, (e)
+ sys.exit(1)
# Select an AZ at random if it was not specified.
if opts.zone == "":
@@ -503,6 +555,20 @@ def main():
print "Terminating zoo..."
for inst in zoo_nodes:
inst.terminate()
+ # Delete security groups as well
+ group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
+ groups = conn.get_all_security_groups()
+ for group in groups:
+ if group.name in group_names:
+ print "Deleting security group " + group.name
+ # Delete individual rules before deleting group to remove dependencies
+ for rule in group.rules:
+ for grant in rule.grants:
+ group.revoke(ip_protocol=rule.ip_protocol,
+ from_port=rule.from_port,
+ to_port=rule.to_port,
+ src_group=grant)
+ conn.delete_security_group(group.name)
elif action == "login":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala
index adce551322..6375961390 100644
--- a/examples/src/main/scala/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala
@@ -15,14 +15,13 @@ object SparkKMeans {
return new Vector(line.split(' ').map(_.toDouble))
}
- def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+ def closestPoint(p: Vector, centers: Array[Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
- for (i <- 1 to centers.size) {
- val vCurr = centers.get(i).get
- val tempDist = p.squaredDist(vCurr)
+ for (i <- 0 until centers.length) {
+ val tempDist = p.squaredDist(centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
@@ -43,32 +42,28 @@ object SparkKMeans {
val K = args(2).toInt
val convergeDist = args(3).toDouble
- var points = data.takeSample(false, K, 42)
- var kPoints = new HashMap[Int, Vector]
+ var kPoints = data.takeSample(false, K, 42).toArray
var tempDist = 1.0
-
- for (i <- 1 to points.size) {
- kPoints.put(i, points(i-1))
- }
while(tempDist > convergeDist) {
var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
- var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
+ var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
- var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collect()
+ var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
tempDist = 0.0
- for (pair <- newPoints) {
- tempDist += kPoints.get(pair._1).get.squaredDist(pair._2)
+ for (i <- 0 until K) {
+ tempDist += kPoints(i).squaredDist(newPoints(i))
}
for (newP <- newPoints) {
- kPoints.put(newP._1, newP._2)
+ kPoints(newP._1) = newP._2
}
}
- println("Final centers: " + kPoints)
+ println("Final centers:")
+ kPoints.foreach(println)
System.exit(0)
}
}
diff --git a/run2.cmd b/run2.cmd
index 097718b526..333d0506b0 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -63,5 +63,5 @@ if "%SPARK_LAUNCH_WITH_SCALA%" NEQ 1 goto java_runner
set EXTRA_ARGS=%JAVA_OPTS%
:run_spark
-%RUNNER% -cp "%CLASSPATH%" %EXTRA_ARGS% %*
+"%RUNNER%" -cp "%CLASSPATH%" %EXTRA_ARGS% %*
:exit
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 33774b463d..02fe16866e 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,5 @@
# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
+log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n