aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-05-13 08:53:47 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-06-21 12:14:16 -0400
commitedb18ca928c988a713b9228bb74af1737f2b614b (patch)
tree3d767d112e4b97809a20fb1718a6fbedcda82fca /core/src
parent62c4781400dd908c2fccdcebf0dc816ff0cb8ed4 (diff)
downloadspark-edb18ca928c988a713b9228bb74af1737f2b614b.tar.gz
spark-edb18ca928c988a713b9228bb74af1737f2b614b.tar.bz2
spark-edb18ca928c988a713b9228bb74af1737f2b614b.zip
Rename PythonWorker to PythonWorkerFactory
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala8
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/python/PythonWorkerFactory.scala (renamed from core/src/main/scala/spark/api/python/PythonWorker.scala)22
3 files changed, 17 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 5b55d45212..0a23c45658 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -12,7 +12,7 @@ import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
import spark.serializer.{Serializer, SerializerManager}
import spark.util.AkkaUtils
-import spark.api.python.PythonWorker
+import spark.api.python.PythonWorkerFactory
/**
@@ -41,7 +41,7 @@ class SparkEnv (
// If executorId is NOT found, return defaultHostPort
var executorIdToHostPort: Option[(String, String) => String]) {
- private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorker]()
+ private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
@@ -57,9 +57,9 @@ class SparkEnv (
actorSystem.awaitTermination()
}
- def getPythonWorker(pythonExec: String, envVars: Map[String, String]): PythonWorker = {
+ def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
- pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorker(pythonExec, envVars))
+ pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorkerFactory(pythonExec, envVars)).create
}
}
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index e5acc54c01..3c48071b3f 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -38,8 +38,8 @@ private[spark] class PythonRDD[T: ClassManifest](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
- val worker = SparkEnv.get.getPythonWorker(pythonExec, envVars.toMap).create
val env = SparkEnv.get
+ val worker = env.createPythonWorker(pythonExec, envVars.toMap)
// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + pythonExec) {
diff --git a/core/src/main/scala/spark/api/python/PythonWorker.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
index 74c8c6d37a..ebbd226b3e 100644
--- a/core/src/main/scala/spark/api/python/PythonWorker.scala
+++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
@@ -1,13 +1,13 @@
package spark.api.python
-import java.io.DataInputStream
+import java.io.{DataInputStream, IOException}
import java.net.{Socket, SocketException, InetAddress}
import scala.collection.JavaConversions._
import spark._
-private[spark] class PythonWorker(pythonExec: String, envVars: Map[String, String])
+private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
extends Logging {
var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
@@ -56,14 +56,16 @@ private[spark] class PythonWorker(pythonExec: String, envVars: Map[String, Strin
// Redirect the stderr to ours
new Thread("stderr reader for " + pythonExec) {
override def run() {
- // FIXME HACK: We copy the stream on the level of bytes to
- // attempt to dodge encoding problems.
- val in = daemon.getErrorStream
- var buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- System.err.write(buf, 0, len)
- len = in.read(buf)
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME HACK: We copy the stream on the level of bytes to
+ // attempt to dodge encoding problems.
+ val in = daemon.getErrorStream
+ var buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ System.err.write(buf, 0, len)
+ len = in.read(buf)
+ }
}
}
}.start()