aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/pom.xml11
-rw-r--r--core/pom.xml11
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala39
-rw-r--r--examples/pom.xml11
-rw-r--r--pom.xml11
-rw-r--r--python/pyspark/tests.py9
-rw-r--r--python/pyspark/worker.py15
-rw-r--r--repl-bin/pom.xml11
-rw-r--r--repl/pom.xml11
-rw-r--r--streaming/pom.xml11
10 files changed, 44 insertions, 96 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 5f58347204..a8256a6e8b 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -45,11 +45,6 @@
<profiles>
<profile>
<id>hadoop1</id>
- <activation>
- <property>
- <name>!hadoopVersion</name>
- </property>
- </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
@@ -77,12 +72,6 @@
</profile>
<profile>
<id>hadoop2</id>
- <activation>
- <property>
- <name>hadoopVersion</name>
- <value>2</value>
- </property>
- </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
diff --git a/core/pom.xml b/core/pom.xml
index 862d3ec37a..873e8a1d0f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -163,11 +163,6 @@
<profiles>
<profile>
<id>hadoop1</id>
- <activation>
- <property>
- <name>!hadoopVersion</name>
- </property>
- </activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -220,12 +215,6 @@
</profile>
<profile>
<id>hadoop2</id>
- <activation>
- <property>
- <name>hadoopVersion</name>
- <value>2</value>
- </property>
- </activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index f43a152ca7..39758e94f4 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -103,21 +103,27 @@ private[spark] class PythonRDD[T: ClassManifest](
private def read(): Array[Byte] = {
try {
- val length = stream.readInt()
- if (length != -1) {
- val obj = new Array[Byte](length)
- stream.readFully(obj)
- obj
- } else {
- // We've finished the data section of the output, but we can still read some
- // accumulator updates; let's do that, breaking when we get EOFException
- while (true) {
- val len2 = stream.readInt()
- val update = new Array[Byte](len2)
- stream.readFully(update)
- accumulator += Collections.singletonList(update)
- }
- new Array[Byte](0)
+ stream.readInt() match {
+ case length if length > 0 =>
+ val obj = new Array[Byte](length)
+ stream.readFully(obj)
+ obj
+ case -2 =>
+ // Signals that an exception has been thrown in python
+ val exLength = stream.readInt()
+ val obj = new Array[Byte](exLength)
+ stream.readFully(obj)
+ throw new PythonException(new String(obj))
+ case -1 =>
+ // We've finished the data section of the output, but we can still read some
+ // accumulator updates; let's do that, breaking when we get EOFException
+ while (true) {
+ val len2 = stream.readInt()
+ val update = new Array[Byte](len2)
+ stream.readFully(update)
+ accumulator += Collections.singletonList(update)
+ }
+ new Array[Byte](0)
}
} catch {
case eof: EOFException => {
@@ -140,6 +146,9 @@ private[spark] class PythonRDD[T: ClassManifest](
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
+/** Thrown for exceptions in user Python code. */
+private class PythonException(msg: String) extends Exception(msg)
+
/**
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
diff --git a/examples/pom.xml b/examples/pom.xml
index 4d43103475..f43af670c6 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -50,11 +50,6 @@
<profiles>
<profile>
<id>hadoop1</id>
- <activation>
- <property>
- <name>!hadoopVersion</name>
- </property>
- </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
@@ -88,12 +83,6 @@
</profile>
<profile>
<id>hadoop2</id>
- <activation>
- <property>
- <name>hadoopVersion</name>
- <value>2</value>
- </property>
- </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
diff --git a/pom.xml b/pom.xml
index 3ea989a082..c6b9012dc6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -499,11 +499,6 @@
<profiles>
<profile>
<id>hadoop1</id>
- <activation>
- <property>
- <name>!hadoopVersion</name>
- </property>
- </activation>
<properties>
<hadoop.major.version>1</hadoop.major.version>
@@ -521,12 +516,6 @@
<profile>
<id>hadoop2</id>
- <activation>
- <property>
- <name>hadoopVersion</name>
- <value>2</value>
- </property>
- </activation>
<properties>
<hadoop.major.version>2</hadoop.major.version>
</properties>
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index df7235756d..52297d44e6 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -108,5 +108,14 @@ class TestAddFile(PySparkTestCase):
self.assertEqual("Hello World!", UserClass().hello())
+class TestIO(PySparkTestCase):
+
+ def test_stdout_redirection(self):
+ import subprocess
+ def func(x):
+ subprocess.check_call('ls', shell=True)
+ self.sc.parallelize([1]).foreach(func)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index d33d6dd15f..812e7a9da5 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1,7 +1,9 @@
"""
Worker that receives input from Piped RDD.
"""
+import os
import sys
+import traceback
from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
# copy_reg module.
@@ -14,8 +16,8 @@ from pyspark.serializers import write_with_length, read_with_length, write_int,
# Redirect stdout to stderr so that users must return values from functions.
-old_stdout = sys.stdout
-sys.stdout = sys.stderr
+old_stdout = os.fdopen(os.dup(1), 'w')
+os.dup2(2, 1)
def load_obj():
@@ -40,8 +42,13 @@ def main():
else:
dumps = dump_pickle
iterator = read_from_pickle_file(sys.stdin)
- for obj in func(split_index, iterator):
- write_with_length(dumps(obj), old_stdout)
+ try:
+ for obj in func(split_index, iterator):
+ write_with_length(dumps(obj), old_stdout)
+ except Exception as e:
+ write_int(-2, old_stdout)
+ write_with_length(traceback.format_exc(), old_stdout)
+ sys.exit(-1)
# Mark the beginning of the accumulators section of the output
write_int(-1, old_stdout)
for aid, accum in _accumulatorRegistry.items():
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index da91c0f3ab..0667b71cc7 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -70,11 +70,6 @@
<profiles>
<profile>
<id>hadoop1</id>
- <activation>
- <property>
- <name>!hadoopVersion</name>
- </property>
- </activation>
<properties>
<classifier>hadoop1</classifier>
</properties>
@@ -115,12 +110,6 @@
</profile>
<profile>
<id>hadoop2</id>
- <activation>
- <property>
- <name>hadoopVersion</name>
- <value>2</value>
- </property>
- </activation>
<properties>
<classifier>hadoop2</classifier>
</properties>
diff --git a/repl/pom.xml b/repl/pom.xml
index 2dc96beaf5..4a296fa630 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -72,11 +72,6 @@
<profiles>
<profile>
<id>hadoop1</id>
- <activation>
- <property>
- <name>!hadoopVersion</name>
- </property>
- </activation>
<properties>
<classifier>hadoop1</classifier>
</properties>
@@ -128,12 +123,6 @@
</profile>
<profile>
<id>hadoop2</id>
- <activation>
- <property>
- <name>hadoopVersion</name>
- <value>2</value>
- </property>
- </activation>
<properties>
<classifier>hadoop2</classifier>
</properties>
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 3dae815e1a..6ee7e59df3 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -83,11 +83,6 @@
<profiles>
<profile>
<id>hadoop1</id>
- <activation>
- <property>
- <name>!hadoopVersion</name>
- </property>
- </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
@@ -115,12 +110,6 @@
</profile>
<profile>
<id>hadoop2</id>
- <activation>
- <property>
- <name>hadoopVersion</name>
- <value>2</value>
- </property>
- </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>