diff options
-rw-r--r-- | bagel/pom.xml | 11 | ||||
-rw-r--r-- | core/pom.xml | 11 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 39 | ||||
-rw-r--r-- | examples/pom.xml | 11 | ||||
-rw-r--r-- | pom.xml | 11 | ||||
-rw-r--r-- | python/pyspark/worker.py | 10 | ||||
-rw-r--r-- | repl-bin/pom.xml | 11 | ||||
-rw-r--r-- | repl/pom.xml | 11 | ||||
-rw-r--r-- | streaming/pom.xml | 11 |
9 files changed, 32 insertions, 94 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml index 5f58347204..a8256a6e8b 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -45,11 +45,6 @@ <profiles> <profile> <id>hadoop1</id> - <activation> - <property> - <name>!hadoopVersion</name> - </property> - </activation> <dependencies> <dependency> <groupId>org.spark-project</groupId> @@ -77,12 +72,6 @@ </profile> <profile> <id>hadoop2</id> - <activation> - <property> - <name>hadoopVersion</name> - <value>2</value> - </property> - </activation> <dependencies> <dependency> <groupId>org.spark-project</groupId> diff --git a/core/pom.xml b/core/pom.xml index 862d3ec37a..873e8a1d0f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -163,11 +163,6 @@ <profiles> <profile> <id>hadoop1</id> - <activation> - <property> - <name>!hadoopVersion</name> - </property> - </activation> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> @@ -220,12 +215,6 @@ </profile> <profile> <id>hadoop2</id> - <activation> - <property> - <name>hadoopVersion</name> - <value>2</value> - </property> - </activation> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index f43a152ca7..39758e94f4 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -103,21 +103,27 @@ private[spark] class PythonRDD[T: ClassManifest]( private def read(): Array[Byte] = { try { - val length = stream.readInt() - if (length != -1) { - val obj = new Array[Byte](length) - stream.readFully(obj) - obj - } else { - // We've finished the data section of the output, but we can still read some - // accumulator updates; let's do that, breaking when we get EOFException - while (true) { - val len2 = stream.readInt() - val update = new Array[Byte](len2) - stream.readFully(update) - accumulator += Collections.singletonList(update) - } - new Array[Byte](0) + stream.readInt() match { + case length if length > 0 => + val obj = new Array[Byte](length) + stream.readFully(obj) + obj + case -2 => + // Signals that an exception has been thrown in python + val exLength = stream.readInt() + val obj = new Array[Byte](exLength) + stream.readFully(obj) + throw new PythonException(new String(obj)) + case -1 => + // We've finished the data section of the output, but we can still read some + // accumulator updates; let's do that, breaking when we get EOFException + while (true) { + val len2 = stream.readInt() + val update = new Array[Byte](len2) + stream.readFully(update) + accumulator += Collections.singletonList(update) + } + new Array[Byte](0) } } catch { case eof: EOFException => { @@ -140,6 +146,9 @@ private[spark] class PythonRDD[T: ClassManifest]( val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } +/** Thrown for exceptions in user Python code. */ +private class PythonException(msg: String) extends Exception(msg) + /** * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. * This is used by PySpark's shuffle operations. diff --git a/examples/pom.xml b/examples/pom.xml index 4d43103475..f43af670c6 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -50,11 +50,6 @@ <profiles> <profile> <id>hadoop1</id> - <activation> - <property> - <name>!hadoopVersion</name> - </property> - </activation> <dependencies> <dependency> <groupId>org.spark-project</groupId> @@ -88,12 +83,6 @@ </profile> <profile> <id>hadoop2</id> - <activation> - <property> - <name>hadoopVersion</name> - <value>2</value> - </property> - </activation> <dependencies> <dependency> <groupId>org.spark-project</groupId> @@ -499,11 +499,6 @@ <profiles> <profile> <id>hadoop1</id> - <activation> - <property> - <name>!hadoopVersion</name> - </property> - </activation> <properties> <hadoop.major.version>1</hadoop.major.version> @@ -521,12 +516,6 @@ <profile> <id>hadoop2</id> - <activation> - <property> - <name>hadoopVersion</name> - <value>2</value> - </property> - </activation> <properties> <hadoop.major.version>2</hadoop.major.version> </properties> diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d33d6dd15f..9622e0cfe4 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2,6 +2,7 @@ Worker that receives input from Piped RDD. """ import sys +import traceback from base64 import standard_b64decode # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. @@ -40,8 +41,13 @@ def main(): else: dumps = dump_pickle iterator = read_from_pickle_file(sys.stdin) - for obj in func(split_index, iterator): - write_with_length(dumps(obj), old_stdout) + try: + for obj in func(split_index, iterator): + write_with_length(dumps(obj), old_stdout) + except Exception as e: + write_int(-2, old_stdout) + write_with_length(traceback.format_exc(), old_stdout) + sys.exit(-1) # Mark the beginning of the accumulators section of the output write_int(-1, old_stdout) for aid, accum in _accumulatorRegistry.items(): diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index da91c0f3ab..0667b71cc7 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -70,11 +70,6 @@ <profiles> <profile> <id>hadoop1</id> - <activation> - <property> - <name>!hadoopVersion</name> - </property> - </activation> <properties> <classifier>hadoop1</classifier> </properties> @@ -115,12 +110,6 @@ </profile> <profile> <id>hadoop2</id> - <activation> - <property> - <name>hadoopVersion</name> - <value>2</value> - </property> - </activation> <properties> <classifier>hadoop2</classifier> </properties> diff --git a/repl/pom.xml b/repl/pom.xml index 2dc96beaf5..4a296fa630 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -72,11 +72,6 @@ <profiles> <profile> <id>hadoop1</id> - <activation> - <property> - <name>!hadoopVersion</name> - </property> - </activation> <properties> <classifier>hadoop1</classifier> </properties> @@ -128,12 +123,6 @@ </profile> <profile> <id>hadoop2</id> - <activation> - <property> - <name>hadoopVersion</name> - <value>2</value> - </property> - </activation> <properties> <classifier>hadoop2</classifier> </properties> diff --git a/streaming/pom.xml b/streaming/pom.xml index 3dae815e1a..6ee7e59df3 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -83,11 +83,6 @@ <profiles> <profile> <id>hadoop1</id> - <activation> - <property> - <name>!hadoopVersion</name> - </property> - </activation> <dependencies> <dependency> <groupId>org.spark-project</groupId> @@ -115,12 +110,6 @@ </profile> <profile> <id>hadoop2</id> - <activation> - <property> - <name>hadoopVersion</name> - <value>2</value> - </property> - </activation> <dependencies> <dependency> <groupId>org.spark-project</groupId> |