diff options
-rw-r--r-- | python/pyspark/tests.py | 6 | ||||
-rw-r--r-- | python/pyspark/worker.py | 21 |
2 files changed, 19 insertions, 8 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 63cc5e9ad9..6dee7dc66c 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -165,11 +165,17 @@ class TestAddFile(PySparkTestCase): def test_add_py_file(self): # To ensure that we're actually testing addPyFile's effects, check that # this job fails due to `userlibrary` not being on the Python path: + # disable logging in log4j temporarily + log4j = self.sc._jvm.org.apache.log4j + old_level = log4j.LogManager.getRootLogger().getLevel() + log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) def func(x): from userlibrary import UserClass return UserClass().hello() self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first) + log4j.LogManager.getRootLogger().setLevel(old_level) + # Add the file, so the job should now succeed: path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py") self.sc.addPyFile(path) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 24d41b12d1..2770f63059 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -75,14 +75,19 @@ def main(infile, outfile): init_time = time.time() iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) - except Exception as e: - # Write the error to stderr in addition to trying to pass it back to - # Java, in case it happened while serializing a record - print >> sys.stderr, "PySpark worker failed with exception:" - print >> sys.stderr, traceback.format_exc() - write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) - write_with_length(traceback.format_exc(), outfile) - sys.exit(-1) + except Exception: + try: + write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) + write_with_length(traceback.format_exc(), outfile) + outfile.flush() + except IOError: + # JVM close the socket + pass + except Exception: + # Write the error to stderr if it happened while serializing + print >> sys.stderr, "PySpark worker failed with exception:" + print >> sys.stderr, traceback.format_exc() + exit(-1) finish_time = time.time() report_times(outfile, boot_time, init_time, finish_time) # Mark the beginning of the accumulators section of the output |