diff options
author | Jey Kottalam <jey@cs.berkeley.edu> | 2013-05-06 16:34:30 -0700 |
---|---|---|
committer | Jey Kottalam <jey@cs.berkeley.edu> | 2013-06-21 12:14:16 -0400 |
commit | c79a6078c34c207ad9f9910252f5849424828bf1 (patch) | |
tree | 608039ead195b19fe20a49128e723b885d2f65c2 /core/src | |
parent | 40afe0d2a5562738ef2ff37ed1d448ae2d0cc927 (diff) | |
download | spark-c79a6078c34c207ad9f9910252f5849424828bf1.tar.gz spark-c79a6078c34c207ad9f9910252f5849424828bf1.tar.bz2 spark-c79a6078c34c207ad9f9910252f5849424828bf1.zip |
Prefork Python worker processes
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/SparkEnv.scala | 11 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 66 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonWorker.scala | 89 |
3 files changed, 125 insertions, 41 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index be1a04d619..5691e24c32 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -1,5 +1,8 @@ package spark +import collection.mutable +import serializer.Serializer + import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem} import akka.remote.RemoteActorRefProvider @@ -9,6 +12,7 @@ import spark.storage.BlockManagerMaster import spark.network.ConnectionManager import spark.serializer.{Serializer, SerializerManager} import spark.util.AkkaUtils +import spark.api.python.PythonWorker /** @@ -37,6 +41,8 @@ class SparkEnv ( // If executorId is NOT found, return defaultHostPort var executorIdToHostPort: Option[(String, String) => String]) { + private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorker]() + def stop() { httpFileServer.stop() mapOutputTracker.stop() @@ -50,6 +56,11 @@ class SparkEnv ( actorSystem.awaitTermination() } + def getPythonWorker(pythonExec: String, envVars: Map[String, String]): PythonWorker = { + synchronized { + pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorker(pythonExec, envVars)) + } + } def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = { val env = SparkEnv.get diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index e9978d713f..e5acc54c01 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -2,10 +2,9 @@ package spark.api.python import java.io._ import java.net._ -import java.util.{List => JList, ArrayList => JArrayList, Collections} +import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} import scala.collection.JavaConversions._ -import scala.io.Source import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import spark.broadcast.Broadcast @@ -16,7 +15,7 @@ import spark.rdd.PipedRDD private[spark] class PythonRDD[T: ClassManifest]( parent: RDD[T], command: Seq[String], - envVars: java.util.Map[String, String], + envVars: JMap[String, String], preservePartitoning: Boolean, pythonExec: String, broadcastVars: JList[Broadcast[Array[Byte]]], @@ -25,7 +24,7 @@ private[spark] class PythonRDD[T: ClassManifest]( // Similar to Runtime.exec(), if we are given a single string, split it into words // using a standard StringTokenizer (i.e. by spaces) - def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String], + def this(parent: RDD[T], command: String, envVars: JMap[String, String], preservePartitoning: Boolean, pythonExec: String, broadcastVars: JList[Broadcast[Array[Byte]]], accumulator: Accumulator[JList[Array[Byte]]]) = @@ -36,36 +35,18 @@ private[spark] class PythonRDD[T: ClassManifest]( override val partitioner = if (preservePartitoning) parent.partitioner else None - override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { - val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME") - - val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py")) - // Add the environmental variables to the process. - val currentEnvVars = pb.environment() - - for ((variable, value) <- envVars) { - currentEnvVars.put(variable, value) - } + override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis - val proc = pb.start() + val worker = SparkEnv.get.getPythonWorker(pythonExec, envVars.toMap).create val env = SparkEnv.get - // Start a thread to print the process's stderr to ours - new Thread("stderr reader for " + pythonExec) { - override def run() { - for (line <- Source.fromInputStream(proc.getErrorStream).getLines) { - System.err.println(line) - } - } - }.start() - // Start a thread to feed the process input from our parent's iterator new Thread("stdin writer for " + pythonExec) { override def run() { SparkEnv.set(env) - val out = new PrintWriter(proc.getOutputStream) - val dOut = new DataOutputStream(proc.getOutputStream) + val out = new PrintWriter(worker.getOutputStream) + val dOut = new DataOutputStream(worker.getOutputStream) // Partition index dOut.writeInt(split.index) // sparkFilesDir @@ -89,16 +70,21 @@ private[spark] class PythonRDD[T: ClassManifest]( } dOut.flush() out.flush() - proc.getOutputStream.close() + worker.shutdownOutput() } }.start() // Return an iterator that read lines from the process's stdout - val stream = new DataInputStream(proc.getInputStream) + val stream = new DataInputStream(worker.getInputStream) return new Iterator[Array[Byte]] { def next(): Array[Byte] = { val obj = _nextObj - _nextObj = read() + if (hasNext) { + // FIXME: can deadlock if worker is waiting for us to + // respond to current message (currently irrelevant because + // output is shutdown before we read any input) + _nextObj = read() + } obj } @@ -110,7 +96,7 @@ private[spark] class PythonRDD[T: ClassManifest]( stream.readFully(obj) obj case -3 => - // Timing data from child + // Timing data from worker val bootTime = stream.readLong() val initTime = stream.readLong() val finishTime = stream.readLong() @@ -127,23 +113,21 @@ private[spark] class PythonRDD[T: ClassManifest]( stream.readFully(obj) throw new PythonException(new String(obj)) case -1 => - // We've finished the data section of the output, but we can still read some - // accumulator updates; let's do that, breaking when we get EOFException - while (true) { - val len2 = stream.readInt() + // We've finished the data section of the output, but we can still + // read some accumulator updates; let's do that, breaking when we + // get a negative length record. + var len2 = stream.readInt + while (len2 >= 0) { val update = new Array[Byte](len2) stream.readFully(update) accumulator += Collections.singletonList(update) + len2 = stream.readInt } new Array[Byte](0) } } catch { case eof: EOFException => { - val exitStatus = proc.waitFor() - if (exitStatus != 0) { - throw new Exception("Subprocess exited with status " + exitStatus) - } - new Array[Byte](0) + throw new SparkException("Python worker exited unexpectedly (crashed)", eof) } case e => throw e } @@ -171,7 +155,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends override def compute(split: Partition, context: TaskContext) = prev.iterator(split, context).grouped(2).map { case Seq(a, b) => (a, b) - case x => throw new Exception("PairwiseRDD: unexpected value: " + x) + case x => throw new SparkException("PairwiseRDD: unexpected value: " + x) } val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) } @@ -227,7 +211,7 @@ private[spark] object PythonRDD { dOut.write(s) dOut.writeByte(Pickle.STOP) } else { - throw new Exception("Unexpected RDD type") + throw new SparkException("Unexpected RDD type") } } diff --git a/core/src/main/scala/spark/api/python/PythonWorker.scala b/core/src/main/scala/spark/api/python/PythonWorker.scala new file mode 100644 index 0000000000..8ee3c6884f --- /dev/null +++ b/core/src/main/scala/spark/api/python/PythonWorker.scala @@ -0,0 +1,89 @@ +package spark.api.python + +import java.io.DataInputStream +import java.net.{Socket, SocketException, InetAddress} + +import scala.collection.JavaConversions._ + +import spark._ + +private[spark] class PythonWorker(pythonExec: String, envVars: Map[String, String]) + extends Logging { + var daemon: Process = null + val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1)) + var daemonPort: Int = 0 + + def create(): Socket = { + synchronized { + // Start the daemon if it hasn't been started + startDaemon + + // Attempt to connect, restart and retry once if it fails + try { + new Socket(daemonHost, daemonPort) + } catch { + case exc: SocketException => { + logWarning("Python daemon unexpectedly quit, attempting to restart") + stopDaemon + startDaemon + new Socket(daemonHost, daemonPort) + } + case e => throw e + } + } + } + + private def startDaemon() { + synchronized { + // Is it already running? + if (daemon != null) { + return + } + + try { + // Create and start the daemon + val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") + val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py")) + val workerEnv = pb.environment() + workerEnv.putAll(envVars) + daemon = pb.start() + daemonPort = new DataInputStream(daemon.getInputStream).readInt + + // Redirect the stderr to ours + new Thread("stderr reader for " + pythonExec) { + override def run() { + // FIXME HACK: We copy the stream on the level of bytes to + // attempt to dodge encoding problems. + val in = daemon.getErrorStream + var buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + System.err.write(buf, 0, len) + len = in.read(buf) + } + } + }.start() + } catch { + case e => { + stopDaemon + throw e + } + } + + // Important: don't close daemon's stdin (daemon.getOutputStream) so it can correctly + // detect our disappearance. + } + } + + private def stopDaemon() { + synchronized { + // Request shutdown of existing daemon by sending SIGTERM + if (daemon != null) { + daemon.destroy + } + + daemon = null + daemonPort = 0 + } + } +} |