aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-09-28 23:22:07 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-09-28 23:54:29 -0700
commit7090dea44bf9cc671bceb4b3dbee4ac3670c26b2 (patch)
treeebaad2c323b7197cc1094f1a0b7820540e9f2700
parent516248aa66752e55f83d37c8fd3382975e05f700 (diff)
downloadspark-7090dea44bf9cc671bceb4b3dbee4ac3670c26b2.tar.gz
spark-7090dea44bf9cc671bceb4b3dbee4ac3670c26b2.tar.bz2
spark-7090dea44bf9cc671bceb4b3dbee4ac3670c26b2.zip
Changed printlns to log statements and fixed a bug in run that was causing it to fail on a Mesos cluster
-rwxr-xr-xrun35
-rw-r--r--src/scala/spark/Broadcast.scala80
-rw-r--r--src/scala/spark/ClosureCleaner.scala8
-rw-r--r--src/scala/spark/Executor.scala16
-rw-r--r--src/scala/spark/LocalScheduler.scala20
-rw-r--r--src/scala/spark/MesosScheduler.scala36
-rw-r--r--src/scala/spark/RDD.scala16
-rw-r--r--src/scala/spark/SparkContext.scala6
-rw-r--r--src/scala/spark/repl/ClassServer.scala5
-rw-r--r--src/scala/spark/repl/ExecutorClassLoader.scala2
-rw-r--r--src/scala/spark/repl/SparkInterpreter.scala13
11 files changed, 126 insertions, 111 deletions
diff --git a/run b/run
index fe4b929500..f28b39af9b 100755
--- a/run
+++ b/run
@@ -28,25 +28,24 @@ fi
export JAVA_OPTS
# Build up classpath
-SPARK_CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
-SPARK_CLASSPATH+=:$FWDIR/conf
-SPARK_CLASSPATH+=:$FWDIR/third_party/mesos.jar
-SPARK_CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
-SPARK_CLASSPATH+=:$FWDIR/third_party/colt.jar
-SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
-SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
-SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar
-SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar
-SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
-SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
-SPARK_CLASSPATH+=:third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
-SPARK_CLASSPATH+=:third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
-SPARK_CLASSPATH+=:third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
+CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
+CLASSPATH+=:$FWDIR/conf
+CLASSPATH+=:$FWDIR/third_party/mesos.jar
+CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
+CLASSPATH+=:$FWDIR/third_party/colt.jar
+CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
+CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
+CLASSPATH+=:$FWDIR/third_party/scalatest-1.2/scalatest-1.2.jar
+CLASSPATH+=:$FWDIR/third_party/scalacheck_2.8.0-1.7.jar
+CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
+CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
+CLASSPATH+=:$FWDIR/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
+CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
+CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
- SPARK_CLASSPATH+=:$jar
+ CLASSPATH+=:$jar
done
-export SPARK_CLASSPATH
-export CLASSPATH=$SPARK_CLASSPATH # Needed for spark-shell
+export CLASSPATH # Needed for spark-shell
if [ -n "$SCALA_HOME" ]; then
SCALA=${SCALA_HOME}/bin/scala
@@ -54,4 +53,4 @@ else
SCALA=scala
fi
-exec $SCALA -cp $SPARK_CLASSPATH $@
+exec $SCALA -cp $CLASSPATH $@
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 7fe84da47c..fc37635983 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -33,7 +33,7 @@ trait BroadcastRecipe {
// TODO: Right, now no parallelization between multiple broadcasts
@serializable
class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
- extends BroadcastRecipe {
+extends BroadcastRecipe with Logging {
def value = value_
@@ -92,7 +92,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
val time = (System.nanoTime - start) / 1e9
- println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
+ logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
}
}
}
@@ -149,7 +149,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
@serializable
class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
- extends BroadcastRecipe {
+extends BroadcastRecipe with Logging {
def value = value_
@@ -179,7 +179,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
fileIn.close
val time = (System.nanoTime - start) / 1e9
- println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
+ logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
}
}
}
@@ -188,7 +188,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
@serializable
case class SourceInfo (val hostAddress: String, val listenPort: Int,
val totalBlocks: Int, val totalBytes: Int, val replicaID: Int)
- extends Comparable [SourceInfo]{
+extends Comparable[SourceInfo]{
var currentLeechers = 0
var receptionFailed = false
@@ -231,7 +231,7 @@ private object Broadcast {
}
}
-private object BroadcastCS {
+private object BroadcastCS extends Logging {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
// val valueInfos = new MapMaker ().softValues ().makeMap[UUID, Any]
@@ -286,15 +286,15 @@ private object BroadcastCS {
guideMR = new GuideMultipleRequests
guideMR.setDaemon (true)
guideMR.start
- println (System.currentTimeMillis + ": " + "GuideMultipleRequests started")
+ logInfo("GuideMultipleRequests started")
}
serveMR = new ServeMultipleRequests
serveMR.setDaemon (true)
serveMR.start
- println (System.currentTimeMillis + ": " + "ServeMultipleRequests started")
+ logInfo("ServeMultipleRequests started")
- println (System.currentTimeMillis + ": " + "BroadcastCS object has been initialized")
+ logInfo("BroadcastCS object has been initialized")
initialized = true
}
@@ -352,7 +352,7 @@ private object BroadcastCS {
// Connect to Master and send this worker's Information
val clientSocketToMaster =
new Socket(BroadcastCS.masterHostAddress, BroadcastCS.masterListenPort)
- println (System.currentTimeMillis + ": " + "Connected to Master's guiding object")
+ logInfo("Connected to Master's guiding object")
// TODO: Guiding object connection is reusable
val oisMaster =
new ObjectInputStream (clientSocketToMaster.getInputStream)
@@ -371,11 +371,11 @@ private object BroadcastCS {
}
totalBytes = sourceInfo.totalBytes
- println (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
+ logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
retByteArray = receiveSingleTransmission (sourceInfo)
- println (System.currentTimeMillis + ": " + "I got this from receiveSingleTransmission: " + retByteArray)
+ logInfo("I got this from receiveSingleTransmission: " + retByteArray)
// TODO: Update sourceInfo to add error notifactions for Master
if (retByteArray == null) { sourceInfo.receptionFailed = true }
@@ -414,8 +414,8 @@ private object BroadcastCS {
oisSource =
new ObjectInputStream (clientSocketToSource.getInputStream)
- println (System.currentTimeMillis + ": " + "Inside receiveSingleTransmission")
- println (System.currentTimeMillis + ": " + "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
+ logInfo("Inside receiveSingleTransmission")
+ logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
retByteArray = new Array[Byte] (totalBytes)
for (i <- 0 until totalBlocks) {
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
@@ -426,14 +426,14 @@ private object BroadcastCS {
hasBlocksLock.synchronized {
hasBlocksLock.notifyAll
}
- println (System.currentTimeMillis + ": " + "Received block: " + i + " " + bcBlock)
+ logInfo("Received block: " + i + " " + bcBlock)
}
assert (hasBlocks == totalBlocks)
- println (System.currentTimeMillis + ": " + "After the receive loop")
+ logInfo("After the receive loop")
} catch {
case e: Exception => {
retByteArray = null
- println (System.currentTimeMillis + ": " + "receiveSingleTransmission had a " + e)
+ logInfo("receiveSingleTransmission had a " + e)
}
} finally {
if (oisSource != null) { oisSource.close }
@@ -446,13 +446,13 @@ private object BroadcastCS {
return retByteArray
}
- class TrackMultipleValues extends Thread {
+ class TrackMultipleValues extends Thread with Logging {
override def run = {
var threadPool = Executors.newCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (BroadcastCS.masterListenPort)
- println (System.currentTimeMillis + ": " + "TrackMultipleVariables" + serverSocket + " " + listenPort)
+ logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort)
var keepAccepting = true
try {
@@ -463,11 +463,11 @@ private object BroadcastCS {
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
- println ("TrackMultipleValues Timeout. Stopping listening...")
+ logInfo("TrackMultipleValues Timeout. Stopping listening...")
keepAccepting = false
}
}
- println (System.currentTimeMillis + ": " + "TrackMultipleValues:Got new request:" + clientSocket)
+ logInfo("TrackMultipleValues:Got new request:" + clientSocket)
if (clientSocket != null) {
try {
threadPool.execute (new Runnable {
@@ -506,14 +506,14 @@ private object BroadcastCS {
}
- class GuideMultipleRequests extends Thread {
+ class GuideMultipleRequests extends Thread with Logging {
override def run = {
var threadPool = Executors.newCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (BroadcastCS.masterListenPort)
// listenPort = BroadcastCS.masterListenPort
- println (System.currentTimeMillis + ": " + "GuideMultipleRequests" + serverSocket + " " + listenPort)
+ logInfo("GuideMultipleRequests" + serverSocket + " " + listenPort)
var keepAccepting = true
try {
@@ -524,12 +524,12 @@ private object BroadcastCS {
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
- println ("GuideMultipleRequests Timeout. Stopping listening...")
+ logInfo("GuideMultipleRequests Timeout. Stopping listening...")
keepAccepting = false
}
}
if (clientSocket != null) {
- println (System.currentTimeMillis + ": " + "Guide:Accepted new client connection:" + clientSocket)
+ logInfo("Guide:Accepted new client connection:" + clientSocket)
try {
threadPool.execute (new GuideSingleRequest (clientSocket))
} catch {
@@ -543,7 +543,8 @@ private object BroadcastCS {
}
}
- class GuideSingleRequest (val clientSocket: Socket) extends Runnable {
+ class GuideSingleRequest (val clientSocket: Socket)
+ extends Runnable with Logging {
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
private val ois = new ObjectInputStream (clientSocket.getInputStream)
@@ -552,21 +553,21 @@ private object BroadcastCS {
def run = {
try {
- println (System.currentTimeMillis + ": " + "new GuideSingleRequest is running")
+ logInfo("new GuideSingleRequest is running")
// Connecting worker is sending in its hostAddress and listenPort it will
// be listening to. ReplicaID is 0 and other fields are invalid (-1)
var sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
// Select a suitable source and send it back to the worker
selectedSourceInfo = selectSuitableSource (sourceInfo)
- println (System.currentTimeMillis + ": " + "Sending selectedSourceInfo:" + selectedSourceInfo)
+ logInfo("Sending selectedSourceInfo:" + selectedSourceInfo)
oos.writeObject (selectedSourceInfo)
oos.flush
// Add this new (if it can finish) source to the PQ of sources
thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress,
sourceInfo.listenPort, totalBlocks, totalBytes, 0)
- println (System.currentTimeMillis + ": " + "Adding possible new source to pqOfSources: " + thisWorkerInfo)
+ logInfo("Adding possible new source to pqOfSources: " + thisWorkerInfo)
pqOfSources.synchronized {
pqOfSources.add (thisWorkerInfo)
}
@@ -642,14 +643,14 @@ private object BroadcastCS {
}
}
- class ServeMultipleRequests extends Thread {
+ class ServeMultipleRequests extends Thread with Logging {
override def run = {
var threadPool = Executors.newCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0)
listenPort = serverSocket.getLocalPort
- println (System.currentTimeMillis + ": " + "ServeMultipleRequests" + serverSocket + " " + listenPort)
+ logInfo("ServeMultipleRequests" + serverSocket + " " + listenPort)
listenPortLock.synchronized {
listenPortLock.notifyAll
@@ -664,12 +665,12 @@ private object BroadcastCS {
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
- println ("ServeMultipleRequests Timeout. Stopping listening...")
+ logInfo("ServeMultipleRequests Timeout. Stopping listening...")
keepAccepting = false
}
}
if (clientSocket != null) {
- println (System.currentTimeMillis + ": " + "Serve:Accepted new client connection:" + clientSocket)
+ logInfo("Serve:Accepted new client connection:" + clientSocket)
try {
threadPool.execute (new ServeSingleRequest (clientSocket))
} catch {
@@ -683,23 +684,24 @@ private object BroadcastCS {
}
}
- class ServeSingleRequest (val clientSocket: Socket) extends Runnable {
+ class ServeSingleRequest (val clientSocket: Socket)
+ extends Runnable with Logging {
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
private val ois = new ObjectInputStream (clientSocket.getInputStream)
def run = {
try {
- println (System.currentTimeMillis + ": " + "new ServeSingleRequest is running")
+ logInfo("new ServeSingleRequest is running")
sendObject
} catch {
// TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
case e: Exception => {
- println (System.currentTimeMillis + ": " + "ServeSingleRequest had a " + e)
+ logInfo("ServeSingleRequest had a " + e)
}
} finally {
- println (System.currentTimeMillis + ": " + "ServeSingleRequest is closing streams and sockets")
+ logInfo("ServeSingleRequest is closing streams and sockets")
ois.close
oos.close
clientSocket.close
@@ -726,7 +728,7 @@ private object BroadcastCS {
} catch {
case e: Exception => { }
}
- println (System.currentTimeMillis + ": " + "Send block: " + i + " " + arrayOfBlocks(i))
+ logInfo("Send block: " + i + " " + arrayOfBlocks(i))
}
}
}
@@ -734,7 +736,7 @@ private object BroadcastCS {
}
}
-private object BroadcastCH {
+private object BroadcastCH extends Logging {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
private var initialized = false
diff --git a/src/scala/spark/ClosureCleaner.scala b/src/scala/spark/ClosureCleaner.scala
index 8037434c38..0e0b3954d4 100644
--- a/src/scala/spark/ClosureCleaner.scala
+++ b/src/scala/spark/ClosureCleaner.scala
@@ -8,7 +8,7 @@ import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.Opcodes._
-object ClosureCleaner {
+object ClosureCleaner extends Logging {
private def getClassReader(cls: Class[_]): ClassReader = {
new ClassReader(cls.getResourceAsStream(
cls.getName.replaceFirst("^.*\\.", "") + ".class"))
@@ -72,13 +72,13 @@ object ClosureCleaner {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
- //println("1: Setting " + fieldName + " on " + cls + " to " + value);
+ //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
field.set(outer, value)
}
}
if (outer != null) {
- //println("2: Setting $outer on " + func.getClass + " to " + outer);
+ //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
val field = func.getClass.getDeclaredField("$outer")
field.setAccessible(true)
field.set(func, outer)
@@ -101,7 +101,7 @@ object ClosureCleaner {
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef];
if (outer != null) {
- //println("3: Setting $outer on " + cls + " to " + outer);
+ //logInfo("3: Setting $outer on " + cls + " to " + outer);
val field = cls.getDeclaredField("$outer")
field.setAccessible(true)
field.set(obj, outer)
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala
index 58b20b41dc..be73aae541 100644
--- a/src/scala/spark/Executor.scala
+++ b/src/scala/spark/Executor.scala
@@ -5,10 +5,14 @@ import java.util.concurrent.{Executors, ExecutorService}
import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver}
import mesos.{TaskDescription, TaskState, TaskStatus}
-object Executor {
+/**
+ * The Mesos executor for Spark.
+ */
+object Executor extends Logging {
def main(args: Array[String]) {
System.loadLibrary("mesos")
+ // Create a new Executor implementation that will run our tasks
val exec = new mesos.Executor() {
var classLoader: ClassLoader = null
var threadPool: ExecutorService = null
@@ -27,7 +31,7 @@ object Executor {
classLoader = this.getClass.getClassLoader
val classUri = System.getProperty("spark.repl.class.uri")
if (classUri != null) {
- println("Using REPL class URI: " + classUri)
+ logInfo("Using REPL class URI: " + classUri)
classLoader = new repl.ExecutorClassLoader(classUri, classLoader)
}
Thread.currentThread.setContextClassLoader(classLoader)
@@ -43,7 +47,7 @@ object Executor {
val arg = desc.getArg
threadPool.execute(new Runnable() {
def run() = {
- println("Running task ID " + taskId)
+ logInfo("Running task ID " + taskId)
try {
Accumulators.clear
val task = Utils.deserialize[Task[Any]](arg, classLoader)
@@ -52,12 +56,11 @@ object Executor {
val result = new TaskResult(value, accumUpdates)
d.sendStatusUpdate(new TaskStatus(
taskId, TaskState.TASK_FINISHED, Utils.serialize(result)))
- println("Finished task ID " + taskId)
+ logInfo("Finished task ID " + taskId)
} catch {
case e: Exception => {
// TODO: Handle errors in tasks less dramatically
- System.err.println("Exception in task ID " + taskId + ":")
- e.printStackTrace
+ logError("Exception in task ID " + taskId, e)
System.exit(1)
}
}
@@ -66,6 +69,7 @@ object Executor {
}
}
+ // Start it running and connect it to the slave
new MesosExecutorDriver(exec).run()
}
}
diff --git a/src/scala/spark/LocalScheduler.scala b/src/scala/spark/LocalScheduler.scala
index a3e0fae065..20954a1224 100644
--- a/src/scala/spark/LocalScheduler.scala
+++ b/src/scala/spark/LocalScheduler.scala
@@ -4,8 +4,10 @@ import java.util.concurrent._
import scala.collection.mutable.Map
-// A simple Scheduler implementation that runs tasks locally in a thread pool.
-private class LocalScheduler(threads: Int) extends Scheduler {
+/**
+ * A simple Scheduler implementation that runs tasks locally in a thread pool.
+ */
+private class LocalScheduler(threads: Int) extends Scheduler with Logging {
var threadPool: ExecutorService =
Executors.newFixedThreadPool(threads, DaemonThreadFactory)
@@ -20,25 +22,24 @@ private class LocalScheduler(threads: Int) extends Scheduler {
for (i <- 0 until tasks.length) {
futures(i) = threadPool.submit(new Callable[TaskResult[T]]() {
def call(): TaskResult[T] = {
- println("Running task " + i)
+ logInfo("Running task " + i)
try {
// Serialize and deserialize the task so that accumulators are
// changed to thread-local ones; this adds a bit of unnecessary
// overhead but matches how the Nexus Executor works
Accumulators.clear
val bytes = Utils.serialize(tasks(i))
- println("Size of task " + i + " is " + bytes.size + " bytes")
+ logInfo("Size of task " + i + " is " + bytes.size + " bytes")
val task = Utils.deserialize[Task[T]](
bytes, currentThread.getContextClassLoader)
val value = task.run
val accumUpdates = Accumulators.values
- println("Finished task " + i)
+ logInfo("Finished task " + i)
new TaskResult[T](value, accumUpdates)
} catch {
case e: Exception => {
// TODO: Do something nicer here
- System.err.println("Exception in task " + i + ":")
- e.printStackTrace()
+ logError("Exception in task " + i, e)
System.exit(1)
null
}
@@ -58,7 +59,10 @@ private class LocalScheduler(threads: Int) extends Scheduler {
override def numCores() = threads
}
-// A ThreadFactory that creates daemon threads
+
+/**
+ * A ThreadFactory that creates daemon threads
+ */
private object DaemonThreadFactory extends ThreadFactory {
override def newThread(r: Runnable): Thread = {
val t = new Thread(r);
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 42ad45a56f..38e8ab2cd1 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -23,7 +23,7 @@ import mesos._
// all the offers to the ParallelOperation and have it load-balance.
private class MesosScheduler(
master: String, frameworkName: String, execArg: Array[Byte])
-extends NScheduler with spark.Scheduler
+extends NScheduler with spark.Scheduler with Logging
{
// Lock used by runTasks to ensure only one thread can be in it
val runTasksMutex = new Object()
@@ -101,7 +101,7 @@ extends NScheduler with spark.Scheduler
}
override def registered(d: SchedulerDriver, frameworkId: String) {
- println("Registered as framework ID " + frameworkId)
+ logInfo("Registered as framework ID " + frameworkId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
@@ -157,7 +157,7 @@ extends NScheduler with spark.Scheduler
activeOps(opId).statusUpdate(status)
}
case None =>
- println("TID " + status.getTaskId + " already finished")
+ logInfo("TID " + status.getTaskId + " already finished")
}
} catch {
@@ -177,8 +177,7 @@ extends NScheduler with spark.Scheduler
}
}
} else {
- val msg = "Mesos error: %s (error code: %d)".format(message, code)
- System.err.println(msg)
+ logError("Mesos error: %s (error code: %d)".format(message, code))
System.exit(1)
}
}
@@ -205,7 +204,7 @@ trait ParallelOperation {
class SimpleParallelOperation[T: ClassManifest](
sched: MesosScheduler, tasks: Array[Task[T]], val opId: Int)
-extends ParallelOperation
+extends ParallelOperation with Logging
{
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "1000").toLong
@@ -273,7 +272,7 @@ extends ParallelOperation
params.put("cpus", "" + desiredCpus)
params.put("mem", "" + desiredMem)
val serializedTask = Utils.serialize(tasks(i))
- println("... Serialized size: " + serializedTask.size)
+ //logInfo("Serialized size: " + serializedTask.size)
return Some(new TaskDescription(taskId, offer.getSlaveId,
"task_" + taskId, params, serializedTask))
}
@@ -298,36 +297,37 @@ extends ParallelOperation
def taskFinished(status: TaskStatus) {
val tid = status.getTaskId
- print("Finished opId " + opId + " TID " + tid)
- if (!finished(tidToIndex(tid))) {
+ logInfo("Finished opId " + opId + " TID " + tid)
+ val index = tidToIndex(tid)
+ if (!finished(index)) {
// Deserialize task result
val result = Utils.deserialize[TaskResult[T]](status.getData)
- results(tidToIndex(tid)) = result.value
+ results(index) = result.value
// Update accumulators
Accumulators.add(callingThread, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
- finished(tidToIndex(tid)) = true
+ finished(index) = true
// Remove TID -> opId mapping from sched
sched.taskIdToOpId.remove(tid)
tasksFinished += 1
-
- println(", finished " + tasksFinished + "/" + numTasks)
+ logInfo("Progress: " + tasksFinished + "/" + numTasks)
if (tasksFinished == numTasks)
setAllFinished()
} else {
- printf("... Task %s had already finished, so ignoring it\n", tidToIndex(tid))
+ logInfo("Task " + index + " has already finished, so ignoring it")
}
}
def taskLost(status: TaskStatus) {
val tid = status.getTaskId
- println("Lost opId " + opId + " TID " + tid)
- if (!finished(tidToIndex(tid))) {
- launched(tidToIndex(tid)) = false
+ logInfo("Lost opId " + opId + " TID " + tid)
+ val index = tidToIndex(tid)
+ if (!finished(index)) {
+ launched(index) = false
sched.taskIdToOpId.remove(tid)
tasksLaunched -= 1
} else {
- printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid))
+ logInfo("Task " + index + " has already finished, so ignoring it")
}
}
diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala
index f8d1ab7f57..4d0c8c6711 100644
--- a/src/scala/spark/RDD.scala
+++ b/src/scala/spark/RDD.scala
@@ -93,27 +93,27 @@ extends Task[U] {
class ForeachTask[T: ClassManifest](
rdd: RDD[T], split: Split, func: T => Unit)
-extends RDDTask[Unit, T](rdd, split) {
+extends RDDTask[Unit, T](rdd, split) with Logging {
override def run() {
- println("Processing " + split)
+ logInfo("Processing " + split)
rdd.iterator(split).foreach(func)
}
}
class CollectTask[T](
rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
-extends RDDTask[Array[T], T](rdd, split) {
+extends RDDTask[Array[T], T](rdd, split) with Logging {
override def run(): Array[T] = {
- println("Processing " + split)
+ logInfo("Processing " + split)
rdd.iterator(split).toArray(m)
}
}
class ReduceTask[T: ClassManifest](
rdd: RDD[T], split: Split, f: (T, T) => T)
-extends RDDTask[Option[T], T](rdd, split) {
+extends RDDTask[Option[T], T](rdd, split) with Logging {
override def run(): Option[T] = {
- println("Processing " + split)
+ logInfo("Processing " + split)
val iter = rdd.iterator(split)
if (iter.hasNext)
Some(iter.reduceLeft(f))
@@ -183,7 +183,7 @@ extends RDD[T](prev.sparkContext) {
class CachedRDD[T](
prev: RDD[T])(implicit m: ClassManifest[T])
-extends RDD[T](prev.sparkContext) {
+extends RDD[T](prev.sparkContext) with Logging {
val id = CachedRDD.newId()
@transient val cacheLocs = Map[Split, List[String]]()
@@ -217,7 +217,7 @@ extends RDD[T](prev.sparkContext) {
}
}
// If we got here, we have to load the split
- println("Loading and caching " + split)
+ logInfo("Loading and caching " + split)
val array = prev.iterator(split).toArray(m)
cache.put(key, array)
loading.synchronized {
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index d5d4db4678..e00eda9aa2 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -6,7 +6,7 @@ import java.util.UUID
import scala.collection.mutable.ArrayBuffer
import scala.actors.Actor._
-class SparkContext(master: String, frameworkName: String) {
+class SparkContext(master: String, frameworkName: String) extends Logging {
Broadcast.initialize(true)
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int) =
@@ -56,10 +56,10 @@ class SparkContext(master: String, frameworkName: String) {
private[spark] def runTaskObjects[T: ClassManifest](tasks: Seq[Task[T]])
: Array[T] = {
- println("Running " + tasks.length + " tasks in parallel")
+ logInfo("Running " + tasks.length + " tasks in parallel")
val start = System.nanoTime
val result = scheduler.runTasks(tasks.toArray)
- println("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s")
+ logInfo("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s")
return result
}
diff --git a/src/scala/spark/repl/ClassServer.scala b/src/scala/spark/repl/ClassServer.scala
index 14ab2fe2a3..6a40d92765 100644
--- a/src/scala/spark/repl/ClassServer.scala
+++ b/src/scala/spark/repl/ClassServer.scala
@@ -8,6 +8,8 @@ import org.eclipse.jetty.server.handler.DefaultHandler
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.server.handler.ResourceHandler
+import spark.Logging
+
/**
* Exception type thrown by ClassServer when it is in the wrong state
@@ -21,7 +23,7 @@ class ServerStateException(message: String) extends Exception(message)
* class files created as the user types in lines of code. This is just a
* wrapper around a Jetty embedded HTTP server.
*/
-class ClassServer(classDir: File) {
+class ClassServer(classDir: File) extends Logging {
private var server: Server = null
private var port: Int = -1
@@ -37,6 +39,7 @@ class ClassServer(classDir: File) {
server.setHandler(handlerList)
server.start()
port = server.getConnectors()(0).getLocalPort()
+ logDebug("ClassServer started at " + uri)
}
}
diff --git a/src/scala/spark/repl/ExecutorClassLoader.scala b/src/scala/spark/repl/ExecutorClassLoader.scala
index 0833b319f7..13d81ec1cf 100644
--- a/src/scala/spark/repl/ExecutorClassLoader.scala
+++ b/src/scala/spark/repl/ExecutorClassLoader.scala
@@ -31,7 +31,6 @@ extends ClassLoader(parent) {
override def findClass(name: String): Class[_] = {
try {
- //println("repl.ExecutorClassLoader resolving " + name)
val pathInDirectory = name.replace('.', '/') + ".class"
val inputStream = {
if (fileSystem != null)
@@ -92,7 +91,6 @@ extends ClassAdapter(cv) {
// This is the constructor, time to clean it; just output some new
// instructions to mv that create the object and set the static MODULE$
// field in the class to point to it, but do nothing otherwise.
- //println("Cleaning constructor of " + className)
mv.visitCode()
mv.visitVarInsn(ALOAD, 0) // load this
mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V")
diff --git a/src/scala/spark/repl/SparkInterpreter.scala b/src/scala/spark/repl/SparkInterpreter.scala
index 29a4420950..ae2e7e8a68 100644
--- a/src/scala/spark/repl/SparkInterpreter.scala
+++ b/src/scala/spark/repl/SparkInterpreter.scala
@@ -90,6 +90,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
+ /** Local directory to save .class files too */
val outputDir = {
val rootDir = new File(System.getProperty("spark.repl.classdir",
System.getProperty("java.io.tmpdir")))
@@ -108,22 +109,26 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
dir = null
} catch { case e: IOException => ; }
}
- if (SPARK_DEBUG_REPL)
+ if (SPARK_DEBUG_REPL) {
println("Output directory: " + dir)
+ }
dir
}
- /** directory to save .class files to */
+ /** Scala compiler virtual directory for outputDir */
//val virtualDirectory = new VirtualDirectory("(memory)", None)
val virtualDirectory = new PlainFile(outputDir)
/** Jetty server that will serve our classes to worker nodes */
val classServer = new ClassServer(outputDir)
- // Start the classServer and remember its URI in a spark system property */
+ // Start the classServer and store its URI in a spark system property
+ // (which will be passed to executors so that they can connect to it)
classServer.start()
- println("ClassServer started, URI = " + classServer.uri)
System.setProperty("spark.repl.class.uri", classServer.uri)
+ if (SPARK_DEBUG_REPL) {
+ println("ClassServer started, URI = " + classServer.uri)
+ }
/** reporter */
object reporter extends ConsoleReporter(settings, null, out) {