aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-05-07 14:35:22 -0700
committerAaron Davidson <aaron@databricks.com>2014-05-07 14:35:22 -0700
commit5200872243aa5906dc8a06772e61d75f19557aac (patch)
treee076fb482f587f864dfde6b711e28a8b1eb5896c
parentd00981a95185229fd1594d5c030a00f219fb1a14 (diff)
downloadspark-5200872243aa5906dc8a06772e61d75f19557aac.tar.gz
spark-5200872243aa5906dc8a06772e61d75f19557aac.tar.bz2
spark-5200872243aa5906dc8a06772e61d75f19557aac.zip
[SPARK-1688] Propagate PySpark worker stderr to driver
When at least one of the following conditions is true, PySpark cannot be loaded: 1. PYTHONPATH is not set 2. PYTHONPATH does not contain the python directory (or jar, in the case of YARN) 3. The jar does not contain pyspark files (YARN) 4. The jar does not contain py4j files (YARN) However, we currently throw the same random `java.io.EOFException` for all of the above cases, when trying to read from the python daemon's output. This message is super unhelpful. This PR includes the python stderr and the PYTHONPATH in the exception propagated to the driver. Now, the exception message looks something like: ``` Error from python worker: : No module named pyspark PYTHONPATH was: /path/to/spark/python:/path/to/some/jar java.io.EOFException <stack trace> ``` whereas before it was just ``` java.io.EOFException <stack trace> ``` Author: Andrew Or <andrewor14@gmail.com> Closes #603 from andrewor14/pyspark-exception and squashes the following commits: 10d65d3 [Andrew Or] Throwable -> Exception, worker -> daemon 862d1d7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception a5ed798 [Andrew Or] Use block string and interpolation instead of var (minor) cc09c45 [Andrew Or] Account for the fact that the python daemon may not have terminated yet 444f019 [Andrew Or] Use the new RedirectThread + include system PYTHONPATH aab00ae [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception 0cc2402 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception 783efe2 [Andrew Or] Make python daemon stderr indentation consistent 9524172 [Andrew Or] Avoid potential NPE / error stream contention + Move things around 29f9688 [Andrew Or] Add back original exception type e92d36b [Andrew Or] Include python worker stderr in the exception propagated to the driver 7c69360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception cdbc185 [Andrew Or] Fix python attribute not found exception when PYTHONPATH is not set dcc0353 [Andrew Or] Check both python and system environment variables for PYTHONPATH 6c09c21 [Andrew Or] Validate PYTHONPATH and PySpark modules before starting python workers
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala136
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala37
4 files changed, 123 insertions, 101 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index cf69fa1d53..6d3e257c4d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.api.python
-import java.io.File
+import java.io.{File, InputStream, IOException, OutputStream}
import scala.collection.mutable.ArrayBuffer
@@ -40,3 +40,28 @@ private[spark] object PythonUtils {
paths.filter(_ != "").mkString(File.pathSeparator)
}
}
+
+
+/**
+ * A utility class to redirect the child process's stdout or stderr.
+ */
+private[spark] class RedirectThread(
+ in: InputStream,
+ out: OutputStream,
+ name: String)
+ extends Thread(name) {
+
+ setDaemon(true)
+ override def run() {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ out.write(buf, 0, len)
+ out.flush()
+ len = in.read(buf)
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index b0bf4e052b..002f2acd94 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -17,15 +17,18 @@
package org.apache.spark.api.python
-import java.io.{DataInputStream, File, IOException, OutputStreamWriter}
+import java.io.{DataInputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import scala.collection.JavaConversions._
import org.apache.spark._
+import org.apache.spark.util.Utils
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
- extends Logging {
+ extends Logging {
+
+ import PythonWorkerFactory._
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon
// (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
@@ -38,7 +41,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
var daemonPort: Int = 0
val pythonPath = PythonUtils.mergePythonPaths(
- PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", ""))
+ PythonUtils.sparkPythonPath,
+ envVars.getOrElse("PYTHONPATH", ""),
+ sys.env.getOrElse("PYTHONPATH", ""))
def create(): Socket = {
if (useDaemon) {
@@ -61,12 +66,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
new Socket(daemonHost, daemonPort)
} catch {
- case exc: SocketException => {
+ case exc: SocketException =>
logWarning("Python daemon unexpectedly quit, attempting to restart")
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
- }
case e: Throwable => throw e
}
}
@@ -87,39 +91,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
- // Redirect the worker's stderr to ours
- new Thread("stderr reader for " + pythonExec) {
- setDaemon(true)
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val in = worker.getErrorStream
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- System.err.write(buf, 0, len)
- len = in.read(buf)
- }
- }
- }
- }.start()
-
- // Redirect worker's stdout to our stderr
- new Thread("stdout reader for " + pythonExec) {
- setDaemon(true)
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val in = worker.getInputStream
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- System.err.write(buf, 0, len)
- len = in.read(buf)
- }
- }
- }
- }.start()
+ // Redirect worker stdout and stderr
+ redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
// Tell the worker our port
val out = new OutputStreamWriter(worker.getOutputStream)
@@ -142,10 +115,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
null
}
- def stop() {
- stopDaemon()
- }
-
private def startDaemon() {
synchronized {
// Is it already running?
@@ -161,46 +130,38 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
- // Redirect the stderr to ours
- new Thread("stderr reader for " + pythonExec) {
- setDaemon(true)
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val in = daemon.getErrorStream
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- System.err.write(buf, 0, len)
- len = in.read(buf)
- }
- }
- }
- }.start()
-
val in = new DataInputStream(daemon.getInputStream)
daemonPort = in.readInt()
- // Redirect further stdout output to our stderr
- new Thread("stdout reader for " + pythonExec) {
- setDaemon(true)
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- System.err.write(buf, 0, len)
- len = in.read(buf)
- }
- }
- }
- }.start()
+ // Redirect daemon stdout and stderr
+ redirectStreamsToStderr(in, daemon.getErrorStream)
+
} catch {
- case e: Throwable => {
+ case e: Exception =>
+
+ // If the daemon exists, wait for it to finish and get its stderr
+ val stderr = Option(daemon)
+ .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) }
+ .getOrElse("")
+
stopDaemon()
- throw e
- }
+
+ if (stderr != "") {
+ val formattedStderr = stderr.replace("\n", "\n ")
+ val errorMessage = s"""
+ |Error from python worker:
+ | $formattedStderr
+ |PYTHONPATH was:
+ | $pythonPath
+ |$e"""
+
+ // Append error message from python daemon, but keep original stack trace
+ val wrappedException = new SparkException(errorMessage.stripMargin)
+ wrappedException.setStackTrace(e.getStackTrace)
+ throw wrappedException
+ } else {
+ throw e
+ }
}
// Important: don't close daemon's stdin (daemon.getOutputStream) so it can correctly
@@ -208,6 +169,19 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}
+ /**
+ * Redirect the given streams to our stderr in separate threads.
+ */
+ private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream) {
+ try {
+ new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
+ new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
+ } catch {
+ case e: Exception =>
+ logError("Exception in redirecting streams", e)
+ }
+ }
+
private def stopDaemon() {
synchronized {
// Request shutdown of existing daemon by sending SIGTERM
@@ -219,4 +193,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
daemonPort = 0
}
}
+
+ def stop() {
+ stopDaemon()
+ }
+}
+
+private object PythonWorkerFactory {
+ val PROCESS_WAIT_TIMEOUT_MS = 10000
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index f2e7c7a508..e20d4486c8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -17,13 +17,10 @@
package org.apache.spark.deploy
-import java.io.{IOException, File, InputStream, OutputStream}
-
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
-import org.apache.spark.SparkContext
-import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.api.python.{PythonUtils, RedirectThread}
/**
* A main class used by spark-submit to launch Python applications. It executes python as a
@@ -62,23 +59,4 @@ object PythonRunner {
System.exit(process.waitFor())
}
-
- /**
- * A utility class to redirect the child process's stdout or stderr
- */
- class RedirectThread(in: InputStream, out: OutputStream, name: String) extends Thread(name) {
- setDaemon(true)
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- out.write(buf, 0, len)
- out.flush()
- len = in.read(buf)
- }
- }
- }
- }
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 202bd46956..3f0ed61c5b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1088,4 +1088,41 @@ private[spark] object Utils extends Logging {
def stripDirectory(path: String): String = {
path.split(File.separator).last
}
+
+ /**
+ * Wait for a process to terminate for at most the specified duration.
+ * Return whether the process actually terminated after the given timeout.
+ */
+ def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
+ var terminated = false
+ val startTime = System.currentTimeMillis
+ while (!terminated) {
+ try {
+ process.exitValue
+ terminated = true
+ } catch {
+ case e: IllegalThreadStateException =>
+ // Process not terminated yet
+ if (System.currentTimeMillis - startTime > timeoutMs) {
+ return false
+ }
+ Thread.sleep(100)
+ }
+ }
+ true
+ }
+
+ /**
+ * Return the stderr of a process after waiting for the process to terminate.
+ * If the process does not terminate within the specified timeout, return None.
+ */
+ def getStderr(process: Process, timeoutMs: Long): Option[String] = {
+ val terminated = Utils.waitForProcess(process, timeoutMs)
+ if (terminated) {
+ Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n"))
+ } else {
+ None
+ }
+ }
+
}