From 90d5754212425d55f992c939a2bc7d9ac6ef92b8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 23 Sep 2016 09:44:30 +0100 Subject: [SPARK-16861][PYSPARK][CORE] Refactor PySpark accumulator API on top of Accumulator V2 ## What changes were proposed in this pull request? Move the internals of the PySpark accumulator API from the old deprecated API on top of the new accumulator API. ## How was this patch tested? The existing PySpark accumulator tests (both unit tests and doc tests at the start of accumulator.py). Author: Holden Karau Closes #14467 from holdenk/SPARK-16861-refactor-pyspark-accumulator-api. --- python/pyspark/context.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'python/pyspark') diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 7a7f59cb50..a3dd1950a5 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -173,9 +173,8 @@ class SparkContext(object): # they will be passed back to us through a TCP server self._accumulatorServer = accumulators._start_update_server() (host, port) = self._accumulatorServer.server_address - self._javaAccumulator = self._jsc.accumulator( - self._jvm.java.util.ArrayList(), - self._jvm.PythonAccumulatorParam(host, port)) + self._javaAccumulator = self._jvm.PythonAccumulatorV2(host, port) + self._jsc.sc().register(self._javaAccumulator) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') self.pythonVer = "%d.%d" % sys.version_info[:2] -- cgit v1.2.3