aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/tests.py6
-rw-r--r--python/pyspark/worker.py21
2 files changed, 19 insertions, 8 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 63cc5e9ad9..6dee7dc66c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -165,11 +165,17 @@ class TestAddFile(PySparkTestCase):
def test_add_py_file(self):
# To ensure that we're actually testing addPyFile's effects, check that
# this job fails due to `userlibrary` not being on the Python path:
+ # disable logging in log4j temporarily
+ log4j = self.sc._jvm.org.apache.log4j
+ old_level = log4j.LogManager.getRootLogger().getLevel()
+ log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
def func(x):
from userlibrary import UserClass
return UserClass().hello()
self.assertRaises(Exception,
self.sc.parallelize(range(2)).map(func).first)
+ log4j.LogManager.getRootLogger().setLevel(old_level)
+
# Add the file, so the job should now succeed:
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
self.sc.addPyFile(path)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 24d41b12d1..2770f63059 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -75,14 +75,19 @@ def main(infile, outfile):
init_time = time.time()
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
- except Exception as e:
- # Write the error to stderr in addition to trying to pass it back to
- # Java, in case it happened while serializing a record
- print >> sys.stderr, "PySpark worker failed with exception:"
- print >> sys.stderr, traceback.format_exc()
- write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
- write_with_length(traceback.format_exc(), outfile)
- sys.exit(-1)
+ except Exception:
+ try:
+ write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
+ write_with_length(traceback.format_exc(), outfile)
+ outfile.flush()
+ except IOError:
+ # JVM close the socket
+ pass
+ except Exception:
+ # Write the error to stderr if it happened while serializing
+ print >> sys.stderr, "PySpark worker failed with exception:"
+ print >> sys.stderr, traceback.format_exc()
+ exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
# Mark the beginning of the accumulators section of the output