aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/accumulators.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/accumulators.py')
-rw-r--r--python/pyspark/accumulators.py9
1 files changed, 7 insertions, 2 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index ccbca67656..7271809e43 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -54,7 +54,7 @@
... def zero(self, value):
... return [0.0] * len(value)
... def addInPlace(self, val1, val2):
-... for i in xrange(len(val1)):
+... for i in range(len(val1)):
... val1[i] += val2[i]
... return val1
>>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
@@ -86,9 +86,13 @@ Traceback (most recent call last):
Exception:...
"""
+import sys
import select
import struct
-import SocketServer
+if sys.version < '3':
+ import SocketServer
+else:
+ import socketserver as SocketServer
import threading
from pyspark.cloudpickle import CloudPickler
from pyspark.serializers import read_int, PickleSerializer
@@ -247,6 +251,7 @@ class AccumulatorServer(SocketServer.TCPServer):
def shutdown(self):
self.server_shutdown = True
SocketServer.TCPServer.shutdown(self)
+ self.server_close()
def _start_update_server():