diff options
author | Jey Kottalam <jey@cs.berkeley.edu> | 2013-05-13 08:53:47 -0700 |
---|---|---|
committer | Jey Kottalam <jey@cs.berkeley.edu> | 2013-06-21 12:14:16 -0400 |
commit | edb18ca928c988a713b9228bb74af1737f2b614b (patch) | |
tree | 3d767d112e4b97809a20fb1718a6fbedcda82fca | |
parent | 62c4781400dd908c2fccdcebf0dc816ff0cb8ed4 (diff) | |
download | spark-edb18ca928c988a713b9228bb74af1737f2b614b.tar.gz spark-edb18ca928c988a713b9228bb74af1737f2b614b.tar.bz2 spark-edb18ca928c988a713b9228bb74af1737f2b614b.zip |
Rename PythonWorker to PythonWorkerFactory
-rw-r--r-- | core/src/main/scala/spark/SparkEnv.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonWorkerFactory.scala (renamed from core/src/main/scala/spark/api/python/PythonWorker.scala) | 22 |
3 files changed, 17 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 5b55d45212..0a23c45658 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -12,7 +12,7 @@ import spark.storage.BlockManagerMaster import spark.network.ConnectionManager import spark.serializer.{Serializer, SerializerManager} import spark.util.AkkaUtils -import spark.api.python.PythonWorker +import spark.api.python.PythonWorkerFactory /** @@ -41,7 +41,7 @@ class SparkEnv ( // If executorId is NOT found, return defaultHostPort var executorIdToHostPort: Option[(String, String) => String]) { - private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorker]() + private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } @@ -57,9 +57,9 @@ class SparkEnv ( actorSystem.awaitTermination() } - def getPythonWorker(pythonExec: String, envVars: Map[String, String]): PythonWorker = { + def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { synchronized { - pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorker(pythonExec, envVars)) + pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorkerFactory(pythonExec, envVars)).create } } diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index e5acc54c01..3c48071b3f 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -38,8 +38,8 @@ private[spark] class PythonRDD[T: ClassManifest]( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis - val worker = SparkEnv.get.getPythonWorker(pythonExec, envVars.toMap).create val env = SparkEnv.get + val worker = env.createPythonWorker(pythonExec, envVars.toMap) // Start a thread to feed the process input from our parent's iterator new Thread("stdin writer for " + pythonExec) { diff --git a/core/src/main/scala/spark/api/python/PythonWorker.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala index 74c8c6d37a..ebbd226b3e 100644 --- a/core/src/main/scala/spark/api/python/PythonWorker.scala +++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala @@ -1,13 +1,13 @@ package spark.api.python -import java.io.DataInputStream +import java.io.{DataInputStream, IOException} import java.net.{Socket, SocketException, InetAddress} import scala.collection.JavaConversions._ import spark._ -private[spark] class PythonWorker(pythonExec: String, envVars: Map[String, String]) +private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) extends Logging { var daemon: Process = null val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1)) @@ -56,14 +56,16 @@ private[spark] class PythonWorker(pythonExec: String, envVars: Map[String, Strin // Redirect the stderr to ours new Thread("stderr reader for " + pythonExec) { override def run() { - // FIXME HACK: We copy the stream on the level of bytes to - // attempt to dodge encoding problems. - val in = daemon.getErrorStream - var buf = new Array[Byte](1024) - var len = in.read(buf) - while (len != -1) { - System.err.write(buf, 0, len) - len = in.read(buf) + scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME HACK: We copy the stream on the level of bytes to + // attempt to dodge encoding problems. + val in = daemon.getErrorStream + var buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + System.err.write(buf, 0, len) + len = in.read(buf) + } } } }.start() |