aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py16
1 files changed, 10 insertions, 6 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 452d6fabdc..fbdaf3a581 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -18,6 +18,7 @@
"""
Worker that receives input from Piped RDD.
"""
+from __future__ import print_function
import os
import sys
import time
@@ -37,9 +38,9 @@ utf8_deserializer = UTF8Deserializer()
def report_times(outfile, boot, init, finish):
write_int(SpecialLengths.TIMING_DATA, outfile)
- write_long(1000 * boot, outfile)
- write_long(1000 * init, outfile)
- write_long(1000 * finish, outfile)
+ write_long(int(1000 * boot), outfile)
+ write_long(int(1000 * init), outfile)
+ write_long(int(1000 * finish), outfile)
def add_path(path):
@@ -72,6 +73,9 @@ def main(infile, outfile):
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
add_path(os.path.join(spark_files_dir, filename))
+ if sys.version > '3':
+ import importlib
+ importlib.invalidate_caches()
# fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
@@ -106,14 +110,14 @@ def main(infile, outfile):
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
- write_with_length(traceback.format_exc(), outfile)
+ write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
- print >> sys.stderr, "PySpark worker failed with exception:"
- print >> sys.stderr, traceback.format_exc()
+ print("PySpark worker failed with exception:", file=sys.stderr)
+ print(traceback.format_exc(), file=sys.stderr)
exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)