aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/__init__.py4
-rw-r--r--python/pyspark/accumulators.py187
-rw-r--r--python/pyspark/context.py54
-rw-r--r--python/pyspark/rdd.py54
-rw-r--r--python/pyspark/serializers.py7
-rw-r--r--python/pyspark/shell.py4
-rw-r--r--python/pyspark/tests.py61
-rw-r--r--python/pyspark/worker.py7
8 files changed, 366 insertions, 12 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index c595ae0842..00666bc0a3 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -7,6 +7,10 @@ Public classes:
Main entry point for Spark functionality.
- L{RDD<pyspark.rdd.RDD>}
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
+ - L{Broadcast<pyspark.broadcast.Broadcast>}
+ A broadcast variable that gets reused across tasks.
+ - L{Accumulator<pyspark.accumulators.Accumulator>}
+ An "add-only" shared variable that tasks can only add values to.
"""
import sys
import os
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
new file mode 100644
index 0000000000..8011779ddc
--- /dev/null
+++ b/python/pyspark/accumulators.py
@@ -0,0 +1,187 @@
+"""
+>>> from pyspark.context import SparkContext
+>>> sc = SparkContext('local', 'test')
+>>> a = sc.accumulator(1)
+>>> a.value
+1
+>>> a.value = 2
+>>> a.value
+2
+>>> a += 5
+>>> a.value
+7
+
+>>> sc.accumulator(1.0).value
+1.0
+
+>>> sc.accumulator(1j).value
+1j
+
+>>> rdd = sc.parallelize([1,2,3])
+>>> def f(x):
+... global a
+... a += x
+>>> rdd.foreach(f)
+>>> a.value
+13
+
+>>> class VectorAccumulatorParam(object):
+... def zero(self, value):
+... return [0.0] * len(value)
+... def addInPlace(self, val1, val2):
+... for i in xrange(len(val1)):
+... val1[i] += val2[i]
+... return val1
+>>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
+>>> va.value
+[1.0, 2.0, 3.0]
+>>> def g(x):
+... global va
+... va += [x] * 3
+>>> rdd.foreach(g)
+>>> va.value
+[7.0, 8.0, 9.0]
+
+>>> rdd.map(lambda x: a.value).collect() # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+ ...
+Py4JJavaError:...
+
+>>> def h(x):
+... global a
+... a.value = 7
+>>> rdd.foreach(h) # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+ ...
+Py4JJavaError:...
+
+>>> sc.accumulator([1.0, 2.0, 3.0]) # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+ ...
+Exception:...
+"""
+
+import struct
+import SocketServer
+import threading
+from pyspark.cloudpickle import CloudPickler
+from pyspark.serializers import read_int, read_with_length, load_pickle
+
+
+# Holds accumulators registered on the current machine, keyed by ID. This is then used to send
+# the local accumulator updates back to the driver program at the end of a task.
+_accumulatorRegistry = {}
+
+
+def _deserialize_accumulator(aid, zero_value, accum_param):
+ from pyspark.accumulators import _accumulatorRegistry
+ accum = Accumulator(aid, zero_value, accum_param)
+ accum._deserialized = True
+ _accumulatorRegistry[aid] = accum
+ return accum
+
+
+class Accumulator(object):
+ """
+ A shared variable that can be accumulated, i.e., has a commutative and associative "add"
+ operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=}
+ operator, but only the driver program is allowed to access its value, using C{value}.
+ Updates from the workers get propagated automatically to the driver program.
+
+ While C{SparkContext} supports accumulators for primitive data types like C{int} and
+ C{float}, users can also define accumulators for custom types by providing a custom
+ C{AccumulatorParam} object with a C{zero} and C{addInPlace} method. Refer to the doctest
+ of this module for an example.
+ """
+
+ def __init__(self, aid, value, accum_param):
+ """Create a new Accumulator with a given initial value and AccumulatorParam object"""
+ from pyspark.accumulators import _accumulatorRegistry
+ self.aid = aid
+ self.accum_param = accum_param
+ self._value = value
+ self._deserialized = False
+ _accumulatorRegistry[aid] = self
+
+ def __reduce__(self):
+ """Custom serialization; saves the zero value from our AccumulatorParam"""
+ param = self.accum_param
+ return (_deserialize_accumulator, (self.aid, param.zero(self._value), param))
+
+ @property
+ def value(self):
+ """Get the accumulator's value; only usable in driver program"""
+ if self._deserialized:
+ raise Exception("Accumulator.value cannot be accessed inside tasks")
+ return self._value
+
+ @value.setter
+ def value(self, value):
+ """Sets the accumulator's value; only usable in driver program"""
+ if self._deserialized:
+ raise Exception("Accumulator.value cannot be accessed inside tasks")
+ self._value = value
+
+ def __iadd__(self, term):
+ """The += operator; adds a term to this accumulator's value"""
+ self._value = self.accum_param.addInPlace(self._value, term)
+ return self
+
+ def __str__(self):
+ return str(self._value)
+
+ def __repr__(self):
+ return "Accumulator<id=%i, value=%s>" % (self.aid, self._value)
+
+
+class AddingAccumulatorParam(object):
+ """
+ An AccumulatorParam that uses the + operators to add values. Designed for simple types
+ such as integers, floats, and lists. Requires the zero value for the underlying type
+ as a parameter.
+ """
+
+ def __init__(self, zero_value):
+ self.zero_value = zero_value
+
+ def zero(self, value):
+ return self.zero_value
+
+ def addInPlace(self, value1, value2):
+ value1 += value2
+ return value1
+
+
+# Singleton accumulator params for some standard types
+INT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0)
+FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0)
+COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
+
+
+class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
+ def handle(self):
+ from pyspark.accumulators import _accumulatorRegistry
+ num_updates = read_int(self.rfile)
+ for _ in range(num_updates):
+ (aid, update) = load_pickle(read_with_length(self.rfile))
+ _accumulatorRegistry[aid] += update
+ # Write a byte in acknowledgement
+ self.wfile.write(struct.pack("!b", 1))
+
+
+def _start_update_server():
+ """Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
+ server = SocketServer.TCPServer(("localhost", 0), _UpdateRequestHandler)
+ thread = threading.Thread(target=server.serve_forever)
+ thread.daemon = True
+ thread.start()
+ return server
+
+
+def _test():
+ import doctest
+ doctest.testmod()
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index e486f206b0..dcbed37270 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -2,6 +2,8 @@ import os
import atexit
from tempfile import NamedTemporaryFile
+from pyspark import accumulators
+from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import dump_pickle, write_with_length, batched
@@ -22,6 +24,7 @@ class SparkContext(object):
_readRDDFromPickleFile = jvm.PythonRDD.readRDDFromPickleFile
_writeIteratorToPickleFile = jvm.PythonRDD.writeIteratorToPickleFile
_takePartition = jvm.PythonRDD.takePartition
+ _next_accum_id = 0
def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024):
@@ -52,6 +55,14 @@ class SparkContext(object):
self._jsc = self.jvm.JavaSparkContext(master, jobName, sparkHome,
empty_string_array)
+ # Create a single Accumulator in Java that we'll send all our updates through;
+ # they will be passed back to us through a TCP server
+ self._accumulatorServer = accumulators._start_update_server()
+ (host, port) = self._accumulatorServer.server_address
+ self._javaAccumulator = self._jsc.accumulator(
+ self.jvm.java.util.ArrayList(),
+ self.jvm.PythonAccumulatorParam(host, port))
+
self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
# Broadcast's __reduce__ method stores Broadcast instances here.
# This allows other code to determine which Broadcast instances have
@@ -74,6 +85,8 @@ class SparkContext(object):
def __del__(self):
if self._jsc:
self._jsc.stop()
+ if self._accumulatorServer:
+ self._accumulatorServer.shutdown()
def stop(self):
"""
@@ -110,6 +123,10 @@ class SparkContext(object):
jrdd = self._jsc.textFile(name, minSplits)
return RDD(jrdd, self)
+ def _checkpointFile(self, name):
+ jrdd = self._jsc.checkpointFile(name)
+ return RDD(jrdd, self)
+
def union(self, rdds):
"""
Build the union of a list of RDDs.
@@ -129,6 +146,31 @@ class SparkContext(object):
return Broadcast(jbroadcast.id(), value, jbroadcast,
self._pickled_broadcast_vars)
+ def accumulator(self, value, accum_param=None):
+ """
+ Create an C{Accumulator} with the given initial value, using a given
+ AccumulatorParam helper object to define how to add values of the data
+ type if provided. Default AccumulatorParams are used for integers and
+ floating-point numbers if you do not provide one. For other types, the
+ AccumulatorParam must implement two methods:
+ - C{zero(value)}: provide a "zero value" for the type, compatible in
+ dimensions with the provided C{value} (e.g., a zero vector).
+ - C{addInPlace(val1, val2)}: add two values of the accumulator's data
+ type, returning a new value; for efficiency, can also update C{val1}
+ in place and return it.
+ """
+ if accum_param == None:
+ if isinstance(value, int):
+ accum_param = accumulators.INT_ACCUMULATOR_PARAM
+ elif isinstance(value, float):
+ accum_param = accumulators.FLOAT_ACCUMULATOR_PARAM
+ elif isinstance(value, complex):
+ accum_param = accumulators.COMPLEX_ACCUMULATOR_PARAM
+ else:
+ raise Exception("No default accumulator param for type %s" % type(value))
+ SparkContext._next_accum_id += 1
+ return Accumulator(SparkContext._next_accum_id - 1, value, accum_param)
+
def addFile(self, path):
"""
Add a file to be downloaded into the working directory of this Spark
@@ -157,3 +199,15 @@ class SparkContext(object):
filename = path.split("/")[-1]
os.environ["PYTHONPATH"] = \
"%s:%s" % (filename, os.environ["PYTHONPATH"])
+
+ def setCheckpointDir(self, dirName, useExisting=False):
+ """
+ Set the directory under which RDDs are going to be checkpointed. The
+ directory must be a HDFS path if running on a cluster.
+
+ If the directory does not exist, it will be created. If the directory
+ exists and C{useExisting} is set to true, then the exisiting directory
+ will be used. Otherwise an exception will be thrown to prevent
+ accidental overriding of checkpoint files in the existing directory.
+ """
+ self._jsc.sc().setCheckpointDir(dirName, useExisting)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1d36da42b0..d53355a8f1 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -32,7 +32,9 @@ class RDD(object):
def __init__(self, jrdd, ctx):
self._jrdd = jrdd
self.is_cached = False
+ self.is_checkpointed = False
self.ctx = ctx
+ self._partitionFunc = None
@property
def context(self):
@@ -49,6 +51,34 @@ class RDD(object):
self._jrdd.cache()
return self
+ def checkpoint(self):
+ """
+ Mark this RDD for checkpointing. It will be saved to a file inside the
+ checkpoint directory set with L{SparkContext.setCheckpointDir()} and
+ all references to its parent RDDs will be removed. This function must
+ be called before any job has been executed on this RDD. It is strongly
+ recommended that this RDD is persisted in memory, otherwise saving it
+ on a file will require recomputation.
+ """
+ self.is_checkpointed = True
+ self._jrdd.rdd().checkpoint()
+
+ def isCheckpointed(self):
+ """
+ Return whether this RDD has been checkpointed or not
+ """
+ return self._jrdd.rdd().isCheckpointed()
+
+ def getCheckpointFile(self):
+ """
+ Gets the name of the file to which this RDD was checkpointed
+ """
+ checkpointFile = self._jrdd.rdd().getCheckpointFile()
+ if checkpointFile.isDefined():
+ return checkpointFile.get()
+ else:
+ return None
+
# TODO persist(self, storageLevel)
def map(self, f, preservesPartitioning=False):
@@ -497,7 +527,7 @@ class RDD(object):
return python_right_outer_join(self, other, numSplits)
# TODO: add option to control map-side combining
- def partitionBy(self, numSplits, hashFunc=hash):
+ def partitionBy(self, numSplits, partitionFunc=hash):
"""
Return a copy of the RDD partitioned using the specified partitioner.
@@ -514,17 +544,21 @@ class RDD(object):
def add_shuffle_key(split, iterator):
buckets = defaultdict(list)
for (k, v) in iterator:
- buckets[hashFunc(k) % numSplits].append((k, v))
+ buckets[partitionFunc(k) % numSplits].append((k, v))
for (split, items) in buckets.iteritems():
yield str(split)
yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
pairRDD = self.ctx.jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
- partitioner = self.ctx.jvm.spark.api.python.PythonPartitioner(numSplits)
- jrdd = pairRDD.partitionBy(partitioner)
- jrdd = jrdd.map(self.ctx.jvm.ExtractValue())
- return RDD(jrdd, self.ctx)
+ partitioner = self.ctx.jvm.PythonPartitioner(numSplits,
+ id(partitionFunc))
+ jrdd = pairRDD.partitionBy(partitioner).values()
+ rdd = RDD(jrdd, self.ctx)
+ # This is required so that id(partitionFunc) remains unique, even if
+ # partitionFunc is a lambda:
+ rdd._partitionFunc = partitionFunc
+ return rdd
# TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
@@ -662,7 +696,7 @@ class PipelinedRDD(RDD):
20
"""
def __init__(self, prev, func, preservesPartitioning=False):
- if isinstance(prev, PipelinedRDD) and not prev.is_cached:
+ if isinstance(prev, PipelinedRDD) and prev._is_pipelinable():
prev_func = prev.func
def pipeline_func(split, iterator):
return func(split, prev_func(split, iterator))
@@ -675,6 +709,7 @@ class PipelinedRDD(RDD):
self.preservesPartitioning = preservesPartitioning
self._prev_jrdd = prev._jrdd
self.is_cached = False
+ self.is_checkpointed = False
self.ctx = prev.ctx
self.prev = prev
self._jrdd_val = None
@@ -703,10 +738,13 @@ class PipelinedRDD(RDD):
env = MapConverter().convert(env, self.ctx.gateway._gateway_client)
python_rdd = self.ctx.jvm.PythonRDD(self._prev_jrdd.rdd(),
pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
- broadcast_vars, class_manifest)
+ broadcast_vars, self.ctx._javaAccumulator, class_manifest)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val
+ def _is_pipelinable(self):
+ return not (self.is_cached or self.is_checkpointed)
+
def _test():
import doctest
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 9a5151ea00..115cf28cc2 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -52,8 +52,13 @@ def read_int(stream):
raise EOFError
return struct.unpack("!i", length)[0]
+
+def write_int(value, stream):
+ stream.write(struct.pack("!i", value))
+
+
def write_with_length(obj, stream):
- stream.write(struct.pack("!i", len(obj)))
+ write_int(len(obj), stream)
stream.write(obj)
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 7e6ad3aa76..f6328c561f 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -1,7 +1,7 @@
"""
An interactive shell.
-This fle is designed to be launched as a PYTHONSTARTUP script.
+This file is designed to be launched as a PYTHONSTARTUP script.
"""
import os
from pyspark.context import SparkContext
@@ -14,4 +14,4 @@ print "Spark context avaiable as sc."
# which allows us to execute the user's PYTHONSTARTUP file:
_pythonstartup = os.environ.get('OLD_PYTHONSTARTUP')
if _pythonstartup and os.path.isfile(_pythonstartup):
- execfile(_pythonstartup)
+ execfile(_pythonstartup)
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
new file mode 100644
index 0000000000..b0a403b580
--- /dev/null
+++ b/python/pyspark/tests.py
@@ -0,0 +1,61 @@
+"""
+Unit tests for PySpark; additional tests are implemented as doctests in
+individual modules.
+"""
+import os
+import shutil
+from tempfile import NamedTemporaryFile
+import time
+import unittest
+
+from pyspark.context import SparkContext
+
+
+class TestCheckpoint(unittest.TestCase):
+
+ def setUp(self):
+ self.sc = SparkContext('local[4]', 'TestPartitioning', batchSize=2)
+ self.checkpointDir = NamedTemporaryFile(delete=False)
+ os.unlink(self.checkpointDir.name)
+ self.sc.setCheckpointDir(self.checkpointDir.name)
+
+ def tearDown(self):
+ self.sc.stop()
+ # To avoid Akka rebinding to the same port, since it doesn't unbind
+ # immediately on shutdown
+ self.sc.jvm.System.clearProperty("spark.master.port")
+ shutil.rmtree(self.checkpointDir.name)
+
+ def test_basic_checkpointing(self):
+ parCollection = self.sc.parallelize([1, 2, 3, 4])
+ flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
+
+ self.assertFalse(flatMappedRDD.isCheckpointed())
+ self.assertIsNone(flatMappedRDD.getCheckpointFile())
+
+ flatMappedRDD.checkpoint()
+ result = flatMappedRDD.collect()
+ time.sleep(1) # 1 second
+ self.assertTrue(flatMappedRDD.isCheckpointed())
+ self.assertEqual(flatMappedRDD.collect(), result)
+ self.assertEqual(self.checkpointDir.name,
+ os.path.dirname(flatMappedRDD.getCheckpointFile()))
+
+ def test_checkpoint_and_restore(self):
+ parCollection = self.sc.parallelize([1, 2, 3, 4])
+ flatMappedRDD = parCollection.flatMap(lambda x: [x])
+
+ self.assertFalse(flatMappedRDD.isCheckpointed())
+ self.assertIsNone(flatMappedRDD.getCheckpointFile())
+
+ flatMappedRDD.checkpoint()
+ flatMappedRDD.count() # forces a checkpoint to be computed
+ time.sleep(1) # 1 second
+
+ self.assertIsNotNone(flatMappedRDD.getCheckpointFile())
+ recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile())
+ self.assertEquals([1, 2, 3, 4], recovered.collect())
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 3d792bbaa2..b2b9288089 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -5,9 +5,10 @@ import sys
from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
# copy_reg module.
+from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
-from pyspark.serializers import write_with_length, read_with_length, \
+from pyspark.serializers import write_with_length, read_with_length, write_int, \
read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
@@ -36,6 +37,10 @@ def main():
iterator = read_from_pickle_file(sys.stdin)
for obj in func(split_index, iterator):
write_with_length(dumps(obj), old_stdout)
+ # Mark the beginning of the accumulators section of the output
+ write_int(-1, old_stdout)
+ for aid, accum in _accumulatorRegistry.items():
+ write_with_length(dump_pickle((aid, accum._value)), old_stdout)
if __name__ == '__main__':