aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-03-10 13:54:46 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-06-21 12:14:16 -0400
commit40afe0d2a5562738ef2ff37ed1d448ae2d0cc927 (patch)
tree026ec3ad62e855f70eb38f633a41b2a3c365a65e /python/pyspark/worker.py
parent1057fccf2ac980501501cc27faaf42770a7de9a0 (diff)
downloadspark-40afe0d2a5562738ef2ff37ed1d448ae2d0cc927.tar.gz
spark-40afe0d2a5562738ef2ff37ed1d448ae2d0cc927.tar.bz2
spark-40afe0d2a5562738ef2ff37ed1d448ae2d0cc927.zip
Add Python timing instrumentation
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py16
1 files changed, 15 insertions, 1 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 812e7a9da5..4c33ae49dc 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1,6 +1,8 @@
"""
Worker that receives input from Piped RDD.
"""
+import time
+preboot_time = time.time()
import os
import sys
import traceback
@@ -12,7 +14,7 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, read_with_length, write_int, \
- read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
+ read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
# Redirect stdout to stderr so that users must return values from functions.
@@ -24,7 +26,16 @@ def load_obj():
return load_pickle(standard_b64decode(sys.stdin.readline().strip()))
+def report_times(preboot, boot, init, finish):
+ write_int(-3, old_stdout)
+ write_long(1000 * preboot, old_stdout)
+ write_long(1000 * boot, old_stdout)
+ write_long(1000 * init, old_stdout)
+ write_long(1000 * finish, old_stdout)
+
+
def main():
+ boot_time = time.time()
split_index = read_int(sys.stdin)
spark_files_dir = load_pickle(read_with_length(sys.stdin))
SparkFiles._root_directory = spark_files_dir
@@ -41,6 +52,7 @@ def main():
dumps = lambda x: x
else:
dumps = dump_pickle
+ init_time = time.time()
iterator = read_from_pickle_file(sys.stdin)
try:
for obj in func(split_index, iterator):
@@ -49,6 +61,8 @@ def main():
write_int(-2, old_stdout)
write_with_length(traceback.format_exc(), old_stdout)
sys.exit(-1)
+ finish_time = time.time()
+ report_times(preboot_time, boot_time, init_time, finish_time)
# Mark the beginning of the accumulators section of the output
write_int(-1, old_stdout)
for aid, accum in _accumulatorRegistry.items():