From c75ae3622eeed068c44b1f823ef4d87d01a720fd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 20 Jan 2013 15:12:54 -0800 Subject: Make AccumulatorParam an abstract base class. --- python/pyspark/accumulators.py | 29 ++++++++++++++++++++++++++--- python/pyspark/context.py | 15 +++++---------- 2 files changed, 31 insertions(+), 13 deletions(-) (limited to 'python') diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 8011779ddc..5a9269f9bb 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -61,6 +61,7 @@ Traceback (most recent call last): Exception:... """ +from abc import ABCMeta, abstractmethod import struct import SocketServer import threading @@ -90,8 +91,7 @@ class Accumulator(object): While C{SparkContext} supports accumulators for primitive data types like C{int} and C{float}, users can also define accumulators for custom types by providing a custom - C{AccumulatorParam} object with a C{zero} and C{addInPlace} method. Refer to the doctest - of this module for an example. + L{AccumulatorParam} object. Refer to the doctest of this module for an example. """ def __init__(self, aid, value, accum_param): @@ -134,7 +134,30 @@ class Accumulator(object): return "Accumulator" % (self.aid, self._value) -class AddingAccumulatorParam(object): +class AccumulatorParam(object): + """ + Helper object that defines how to accumulate values of a given type. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def zero(self, value): + """ + Provide a "zero value" for the type, compatible in dimensions with the + provided C{value} (e.g., a zero vector) + """ + return + + @abstractmethod + def addInPlace(self, value1, value2): + """ + Add two values of the accumulator's data type, returning a new value; + for efficiency, can also update C{value1} in place and return it. + """ + return + + +class AddingAccumulatorParam(AccumulatorParam): """ An AccumulatorParam that uses the + operators to add values. Designed for simple types such as integers, floats, and lists. Requires the zero value for the underlying type diff --git a/python/pyspark/context.py b/python/pyspark/context.py index dcbed37270..a17e7a4ad1 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -148,16 +148,11 @@ class SparkContext(object): def accumulator(self, value, accum_param=None): """ - Create an C{Accumulator} with the given initial value, using a given - AccumulatorParam helper object to define how to add values of the data - type if provided. Default AccumulatorParams are used for integers and - floating-point numbers if you do not provide one. For other types, the - AccumulatorParam must implement two methods: - - C{zero(value)}: provide a "zero value" for the type, compatible in - dimensions with the provided C{value} (e.g., a zero vector). - - C{addInPlace(val1, val2)}: add two values of the accumulator's data - type, returning a new value; for efficiency, can also update C{val1} - in place and return it. + Create an L{Accumulator} with the given initial value, using a given + L{AccumulatorParam} helper object to define how to add values of the + data type if provided. Default AccumulatorParams are used for integers + and floating-point numbers if you do not provide one. For other types, + a custom AccumulatorParam can be used. """ if accum_param == None: if isinstance(value, int): -- cgit v1.2.3 From b47d054cfc5ef45b92a1c970388722ffa0283e66 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Jan 2013 11:18:25 -0800 Subject: Remove use of abc.ABCMeta due to cloudpickle issue. cloudpickle runs into issues while pickling subclasses of AccumulatorParam, which may be related to this Python issue: http://bugs.python.org/issue7689 This seems hard to fix and the ABCMeta wasn't necessary, so I removed it. --- python/pyspark/accumulators.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'python') diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 5a9269f9bb..61fcbbd376 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -25,7 +25,8 @@ >>> a.value 13 ->>> class VectorAccumulatorParam(object): +>>> from pyspark.accumulators import AccumulatorParam +>>> class VectorAccumulatorParam(AccumulatorParam): ... def zero(self, value): ... return [0.0] * len(value) ... def addInPlace(self, val1, val2): @@ -61,7 +62,6 @@ Traceback (most recent call last): Exception:... """ -from abc import ABCMeta, abstractmethod import struct import SocketServer import threading @@ -138,23 +138,20 @@ class AccumulatorParam(object): """ Helper object that defines how to accumulate values of a given type. """ - __metaclass__ = ABCMeta - @abstractmethod def zero(self, value): """ Provide a "zero value" for the type, compatible in dimensions with the provided C{value} (e.g., a zero vector) """ - return + raise NotImplementedError - @abstractmethod def addInPlace(self, value1, value2): """ Add two values of the accumulator's data type, returning a new value; for efficiency, can also update C{value1} in place and return it. """ - return + raise NotImplementedError class AddingAccumulatorParam(AccumulatorParam): -- cgit v1.2.3