aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/__init__.py5
-rw-r--r--python/pyspark/accumulators.py48
-rw-r--r--python/pyspark/broadcast.py9
-rw-r--r--python/pyspark/context.py139
-rw-r--r--python/pyspark/files.py38
-rw-r--r--python/pyspark/rdd.py77
-rw-r--r--python/pyspark/shell.py1
-rw-r--r--python/pyspark/tests.py121
-rw-r--r--python/pyspark/worker.py20
9 files changed, 376 insertions, 82 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 00666bc0a3..3e8bca62f0 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -11,6 +11,8 @@ Public classes:
A broadcast variable that gets reused across tasks.
- L{Accumulator<pyspark.accumulators.Accumulator>}
An "add-only" shared variable that tasks can only add values to.
+ - L{SparkFiles<pyspark.files.SparkFiles>}
+ Access files shipped with jobs.
"""
import sys
import os
@@ -19,6 +21,7 @@ sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.eg
from pyspark.context import SparkContext
from pyspark.rdd import RDD
+from pyspark.files import SparkFiles
-__all__ = ["SparkContext", "RDD"]
+__all__ = ["SparkContext", "RDD", "SparkFiles"]
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index c00c3a37af..3e9d7d36da 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -11,6 +11,12 @@
>>> a.value
7
+>>> sc.accumulator(1.0).value
+1.0
+
+>>> sc.accumulator(1j).value
+1j
+
>>> rdd = sc.parallelize([1,2,3])
>>> def f(x):
... global a
@@ -19,7 +25,8 @@
>>> a.value
13
->>> class VectorAccumulatorParam(object):
+>>> from pyspark.accumulators import AccumulatorParam
+>>> class VectorAccumulatorParam(AccumulatorParam):
... def zero(self, value):
... return [0.0] * len(value)
... def addInPlace(self, val1, val2):
@@ -84,8 +91,7 @@ class Accumulator(object):
While C{SparkContext} supports accumulators for primitive data types like C{int} and
C{float}, users can also define accumulators for custom types by providing a custom
- C{AccumulatorParam} object with a C{zero} and C{addInPlace} method. Refer to the doctest
- of this module for an example.
+ L{AccumulatorParam} object. Refer to the doctest of this module for an example.
"""
def __init__(self, aid, value, accum_param):
@@ -124,8 +130,31 @@ class Accumulator(object):
def __str__(self):
return str(self._value)
+ def __repr__(self):
+ return "Accumulator<id=%i, value=%s>" % (self.aid, self._value)
-class AddingAccumulatorParam(object):
+
+class AccumulatorParam(object):
+ """
+ Helper object that defines how to accumulate values of a given type.
+ """
+
+ def zero(self, value):
+ """
+ Provide a "zero value" for the type, compatible in dimensions with the
+ provided C{value} (e.g., a zero vector)
+ """
+ raise NotImplementedError
+
+ def addInPlace(self, value1, value2):
+ """
+ Add two values of the accumulator's data type, returning a new value;
+ for efficiency, can also update C{value1} in place and return it.
+ """
+ raise NotImplementedError
+
+
+class AddingAccumulatorParam(AccumulatorParam):
"""
An AccumulatorParam that uses the + operators to add values. Designed for simple types
such as integers, floats, and lists. Requires the zero value for the underlying type
@@ -145,7 +174,7 @@ class AddingAccumulatorParam(object):
# Singleton accumulator params for some standard types
INT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0)
-DOUBLE_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0)
+FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0)
COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
@@ -167,12 +196,3 @@ def _start_update_server():
thread.daemon = True
thread.start()
return server
-
-
-def _test():
- import doctest
- doctest.testmod()
-
-
-if __name__ == "__main__":
- _test()
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index 93876fa738..def810dd46 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -37,12 +37,3 @@ class Broadcast(object):
def __reduce__(self):
self._pickle_registry.add(self)
return (_from_id, (self.bid, ))
-
-
-def _test():
- import doctest
- doctest.testmod()
-
-
-if __name__ == "__main__":
- _test()
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 1e2f845f9c..657fe6f989 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -1,10 +1,13 @@
import os
-import atexit
+import shutil
+import sys
+from threading import Lock
from tempfile import NamedTemporaryFile
from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
+from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import dump_pickle, write_with_length, batched
from pyspark.rdd import RDD
@@ -19,12 +22,13 @@ class SparkContext(object):
broadcast variables on that cluster.
"""
- gateway = launch_gateway()
- jvm = gateway.jvm
- _readRDDFromPickleFile = jvm.PythonRDD.readRDDFromPickleFile
- _writeIteratorToPickleFile = jvm.PythonRDD.writeIteratorToPickleFile
- _takePartition = jvm.PythonRDD.takePartition
+ _gateway = None
+ _jvm = None
+ _writeIteratorToPickleFile = None
+ _takePartition = None
_next_accum_id = 0
+ _active_spark_context = None
+ _lock = Lock()
def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024):
@@ -44,6 +48,18 @@ class SparkContext(object):
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
"""
+ with SparkContext._lock:
+ if SparkContext._active_spark_context:
+ raise ValueError("Cannot run multiple SparkContexts at once")
+ else:
+ SparkContext._active_spark_context = self
+ if not SparkContext._gateway:
+ SparkContext._gateway = launch_gateway()
+ SparkContext._jvm = SparkContext._gateway.jvm
+ SparkContext._writeIteratorToPickleFile = \
+ SparkContext._jvm.PythonRDD.writeIteratorToPickleFile
+ SparkContext._takePartition = \
+ SparkContext._jvm.PythonRDD.takePartition
self.master = master
self.jobName = jobName
self.sparkHome = sparkHome or None # None becomes null in Py4J
@@ -51,8 +67,8 @@ class SparkContext(object):
self.batchSize = batchSize # -1 represents a unlimited batch size
# Create the Java SparkContext through Py4J
- empty_string_array = self.gateway.new_array(self.jvm.String, 0)
- self._jsc = self.jvm.JavaSparkContext(master, jobName, sparkHome,
+ empty_string_array = self._gateway.new_array(self._jvm.String, 0)
+ self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome,
empty_string_array)
# Create a single Accumulator in Java that we'll send all our updates through;
@@ -60,8 +76,8 @@ class SparkContext(object):
self._accumulatorServer = accumulators._start_update_server()
(host, port) = self._accumulatorServer.server_address
self._javaAccumulator = self._jsc.accumulator(
- self.jvm.java.util.ArrayList(),
- self.jvm.PythonAccumulatorParam(host, port))
+ self._jvm.java.util.ArrayList(),
+ self._jvm.PythonAccumulatorParam(host, port))
self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
# Broadcast's __reduce__ method stores Broadcast instances here.
@@ -73,6 +89,13 @@ class SparkContext(object):
# Deploy any code dependencies specified in the constructor
for path in (pyFiles or []):
self.addPyFile(path)
+ SparkFiles._sc = self
+ sys.path.append(SparkFiles.getRootDirectory())
+
+ # Create a temporary directory inside spark.local.dir:
+ local_dir = self._jvm.spark.Utils.getLocalDir()
+ self._temp_dir = \
+ self._jvm.spark.Utils.createTempDir(local_dir).getAbsolutePath()
@property
def defaultParallelism(self):
@@ -83,17 +106,20 @@ class SparkContext(object):
return self._jsc.sc().defaultParallelism()
def __del__(self):
- if self._jsc:
- self._jsc.stop()
- if self._accumulatorServer:
- self._accumulatorServer.shutdown()
+ self.stop()
def stop(self):
"""
Shut down the SparkContext.
"""
- self._jsc.stop()
- self._jsc = None
+ if self._jsc:
+ self._jsc.stop()
+ self._jsc = None
+ if self._accumulatorServer:
+ self._accumulatorServer.shutdown()
+ self._accumulatorServer = None
+ with SparkContext._lock:
+ SparkContext._active_spark_context = None
def parallelize(self, c, numSlices=None):
"""
@@ -103,14 +129,14 @@ class SparkContext(object):
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().
- tempFile = NamedTemporaryFile(delete=False)
- atexit.register(lambda: os.unlink(tempFile.name))
+ tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
if self.batchSize != 1:
c = batched(c, self.batchSize)
for x in c:
write_with_length(dump_pickle(x), tempFile)
tempFile.close()
- jrdd = self._readRDDFromPickleFile(self._jsc, tempFile.name, numSlices)
+ readRDDFromPickleFile = self._jvm.PythonRDD.readRDDFromPickleFile
+ jrdd = readRDDFromPickleFile(self._jsc, tempFile.name, numSlices)
return RDD(jrdd, self)
def textFile(self, name, minSplits=None):
@@ -123,6 +149,10 @@ class SparkContext(object):
jrdd = self._jsc.textFile(name, minSplits)
return RDD(jrdd, self)
+ def _checkpointFile(self, name):
+ jrdd = self._jsc.checkpointFile(name)
+ return RDD(jrdd, self)
+
def union(self, rdds):
"""
Build the union of a list of RDDs.
@@ -144,16 +174,11 @@ class SparkContext(object):
def accumulator(self, value, accum_param=None):
"""
- Create an C{Accumulator} with the given initial value, using a given
- AccumulatorParam helper object to define how to add values of the data
- type if provided. Default AccumulatorParams are used for integers and
- floating-point numbers if you do not provide one. For other types, the
- AccumulatorParam must implement two methods:
- - C{zero(value)}: provide a "zero value" for the type, compatible in
- dimensions with the provided C{value} (e.g., a zero vector).
- - C{addInPlace(val1, val2)}: add two values of the accumulator's data
- type, returning a new value; for efficiency, can also update C{val1}
- in place and return it.
+ Create an L{Accumulator} with the given initial value, using a given
+ L{AccumulatorParam} helper object to define how to add values of the
+ data type if provided. Default AccumulatorParams are used for integers
+ and floating-point numbers if you do not provide one. For other types,
+ a custom AccumulatorParam can be used.
"""
if accum_param == None:
if isinstance(value, int):
@@ -169,10 +194,26 @@ class SparkContext(object):
def addFile(self, path):
"""
- Add a file to be downloaded into the working directory of this Spark
- job on every node. The C{path} passed can be either a local file,
- a file in HDFS (or other Hadoop-supported filesystems), or an HTTP,
- HTTPS or FTP URI.
+ Add a file to be downloaded with this Spark job on every node.
+ The C{path} passed can be either a local file, a file in HDFS
+ (or other Hadoop-supported filesystems), or an HTTP, HTTPS or
+ FTP URI.
+
+ To access the file in Spark jobs, use
+ L{SparkFiles.get(path)<pyspark.files.SparkFiles.get>} to find its
+ download location.
+
+ >>> from pyspark import SparkFiles
+ >>> path = os.path.join(tempdir, "test.txt")
+ >>> with open(path, "w") as testFile:
+ ... testFile.write("100")
+ >>> sc.addFile(path)
+ >>> def func(iterator):
+ ... with open(SparkFiles.get("test.txt")) as testFile:
+ ... fileVal = int(testFile.readline())
+ ... return [x * 100 for x in iterator]
+ >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
+ [100, 200, 300, 400]
"""
self._jsc.sc().addFile(path)
@@ -193,5 +234,33 @@ class SparkContext(object):
"""
self.addFile(path)
filename = path.split("/")[-1]
- os.environ["PYTHONPATH"] = \
- "%s:%s" % (filename, os.environ["PYTHONPATH"])
+
+ def setCheckpointDir(self, dirName, useExisting=False):
+ """
+ Set the directory under which RDDs are going to be checkpointed. The
+ directory must be a HDFS path if running on a cluster.
+
+ If the directory does not exist, it will be created. If the directory
+ exists and C{useExisting} is set to true, then the exisiting directory
+ will be used. Otherwise an exception will be thrown to prevent
+ accidental overriding of checkpoint files in the existing directory.
+ """
+ self._jsc.sc().setCheckpointDir(dirName, useExisting)
+
+
+def _test():
+ import atexit
+ import doctest
+ import tempfile
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['tempdir'] = tempfile.mkdtemp()
+ atexit.register(lambda: shutil.rmtree(globs['tempdir']))
+ (failure_count, test_count) = doctest.testmod(globs=globs)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/files.py b/python/pyspark/files.py
new file mode 100644
index 0000000000..001b7a28b6
--- /dev/null
+++ b/python/pyspark/files.py
@@ -0,0 +1,38 @@
+import os
+
+
+class SparkFiles(object):
+ """
+ Resolves paths to files added through
+ L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}.
+
+ SparkFiles contains only classmethods; users should not create SparkFiles
+ instances.
+ """
+
+ _root_directory = None
+ _is_running_on_worker = False
+ _sc = None
+
+ def __init__(self):
+ raise NotImplementedError("Do not construct SparkFiles objects")
+
+ @classmethod
+ def get(cls, filename):
+ """
+ Get the absolute path of a file added through C{SparkContext.addFile()}.
+ """
+ path = os.path.join(SparkFiles.getRootDirectory(), filename)
+ return os.path.abspath(path)
+
+ @classmethod
+ def getRootDirectory(cls):
+ """
+ Get the root directory that contains files added through
+ C{SparkContext.addFile()}.
+ """
+ if cls._is_running_on_worker:
+ return cls._root_directory
+ else:
+ # This will have to change if we support multiple SparkContexts:
+ return cls._sc._jvm.spark.SparkFiles.getRootDirectory()
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d705f0f9e1..4cda6cf661 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1,4 +1,3 @@
-import atexit
from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
@@ -32,7 +31,9 @@ class RDD(object):
def __init__(self, jrdd, ctx):
self._jrdd = jrdd
self.is_cached = False
+ self.is_checkpointed = False
self.ctx = ctx
+ self._partitionFunc = None
@property
def context(self):
@@ -49,6 +50,34 @@ class RDD(object):
self._jrdd.cache()
return self
+ def checkpoint(self):
+ """
+ Mark this RDD for checkpointing. It will be saved to a file inside the
+ checkpoint directory set with L{SparkContext.setCheckpointDir()} and
+ all references to its parent RDDs will be removed. This function must
+ be called before any job has been executed on this RDD. It is strongly
+ recommended that this RDD is persisted in memory, otherwise saving it
+ on a file will require recomputation.
+ """
+ self.is_checkpointed = True
+ self._jrdd.rdd().checkpoint()
+
+ def isCheckpointed(self):
+ """
+ Return whether this RDD has been checkpointed or not
+ """
+ return self._jrdd.rdd().isCheckpointed()
+
+ def getCheckpointFile(self):
+ """
+ Gets the name of the file to which this RDD was checkpointed
+ """
+ checkpointFile = self._jrdd.rdd().getCheckpointFile()
+ if checkpointFile.isDefined():
+ return checkpointFile.get()
+ else:
+ return None
+
# TODO persist(self, storageLevel)
def map(self, f, preservesPartitioning=False):
@@ -234,12 +263,8 @@ class RDD(object):
# Transferring lots of data through Py4J can be slow because
# socket.readline() is inefficient. Instead, we'll dump the data to a
# file and read it back.
- tempFile = NamedTemporaryFile(delete=False)
+ tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
tempFile.close()
- def clean_up_file():
- try: os.unlink(tempFile.name)
- except: pass
- atexit.register(clean_up_file)
self.ctx._writeIteratorToPickleFile(iterator, tempFile.name)
# Read the data into Python and deserialize it:
with open(tempFile.name, 'rb') as tempFile:
@@ -347,6 +372,10 @@ class RDD(object):
items = []
for partition in range(self._jrdd.splits().size()):
iterator = self.ctx._takePartition(self._jrdd.rdd(), partition)
+ # Each item in the iterator is a string, Python object, batch of
+ # Python objects. Regardless, it is sufficient to take `num`
+ # of these objects in order to collect `num` Python objects:
+ iterator = iterator.take(num)
items.extend(self._collect_iterator_through_file(iterator))
if len(items) >= num:
break
@@ -377,7 +406,7 @@ class RDD(object):
return (str(x).encode("utf-8") for x in iterator)
keyed = PipelinedRDD(self, func)
keyed._bypass_serializer = True
- keyed._jrdd.map(self.ctx.jvm.BytesToString()).saveAsTextFile(path)
+ keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
# Pair functions
@@ -497,7 +526,7 @@ class RDD(object):
return python_right_outer_join(self, other, numSplits)
# TODO: add option to control map-side combining
- def partitionBy(self, numSplits, hashFunc=hash):
+ def partitionBy(self, numSplits, partitionFunc=hash):
"""
Return a copy of the RDD partitioned using the specified partitioner.
@@ -514,17 +543,21 @@ class RDD(object):
def add_shuffle_key(split, iterator):
buckets = defaultdict(list)
for (k, v) in iterator:
- buckets[hashFunc(k) % numSplits].append((k, v))
+ buckets[partitionFunc(k) % numSplits].append((k, v))
for (split, items) in buckets.iteritems():
yield str(split)
yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
- pairRDD = self.ctx.jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
- partitioner = self.ctx.jvm.spark.api.python.PythonPartitioner(numSplits)
- jrdd = pairRDD.partitionBy(partitioner)
- jrdd = jrdd.map(self.ctx.jvm.ExtractValue())
- return RDD(jrdd, self.ctx)
+ pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
+ partitioner = self.ctx._jvm.PythonPartitioner(numSplits,
+ id(partitionFunc))
+ jrdd = pairRDD.partitionBy(partitioner).values()
+ rdd = RDD(jrdd, self.ctx)
+ # This is required so that id(partitionFunc) remains unique, even if
+ # partitionFunc is a lambda:
+ rdd._partitionFunc = partitionFunc
+ return rdd
# TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
@@ -662,7 +695,7 @@ class PipelinedRDD(RDD):
20
"""
def __init__(self, prev, func, preservesPartitioning=False):
- if isinstance(prev, PipelinedRDD) and not prev.is_cached:
+ if isinstance(prev, PipelinedRDD) and prev._is_pipelinable():
prev_func = prev.func
def pipeline_func(split, iterator):
return func(split, prev_func(split, iterator))
@@ -675,6 +708,7 @@ class PipelinedRDD(RDD):
self.preservesPartitioning = preservesPartitioning
self._prev_jrdd = prev._jrdd
self.is_cached = False
+ self.is_checkpointed = False
self.ctx = prev.ctx
self.prev = prev
self._jrdd_val = None
@@ -695,18 +729,21 @@ class PipelinedRDD(RDD):
pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
- self.ctx.gateway._gateway_client)
+ self.ctx._gateway._gateway_client)
self.ctx._pickled_broadcast_vars.clear()
class_manifest = self._prev_jrdd.classManifest()
env = copy.copy(self.ctx.environment)
env['PYTHONPATH'] = os.environ.get("PYTHONPATH", "")
- env = MapConverter().convert(env, self.ctx.gateway._gateway_client)
- python_rdd = self.ctx.jvm.PythonRDD(self._prev_jrdd.rdd(),
+ env = MapConverter().convert(env, self.ctx._gateway._gateway_client)
+ python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
broadcast_vars, self.ctx._javaAccumulator, class_manifest)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val
+ def _is_pipelinable(self):
+ return not (self.is_cached or self.is_checkpointed)
+
def _test():
import doctest
@@ -715,8 +752,10 @@ def _test():
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
- doctest.testmod(globs=globs)
+ (failure_count, test_count) = doctest.testmod(globs=globs)
globs['sc'].stop()
+ if failure_count:
+ exit(-1)
if __name__ == "__main__":
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index f6328c561f..54ff1bf8e7 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -4,6 +4,7 @@ An interactive shell.
This file is designed to be launched as a PYTHONSTARTUP script.
"""
import os
+import pyspark
from pyspark.context import SparkContext
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
new file mode 100644
index 0000000000..6a1962d267
--- /dev/null
+++ b/python/pyspark/tests.py
@@ -0,0 +1,121 @@
+"""
+Unit tests for PySpark; additional tests are implemented as doctests in
+individual modules.
+"""
+import os
+import shutil
+import sys
+from tempfile import NamedTemporaryFile
+import time
+import unittest
+
+from pyspark.context import SparkContext
+from pyspark.files import SparkFiles
+from pyspark.java_gateway import SPARK_HOME
+
+
+class PySparkTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self._old_sys_path = list(sys.path)
+ class_name = self.__class__.__name__
+ self.sc = SparkContext('local[4]', class_name , batchSize=2)
+
+ def tearDown(self):
+ self.sc.stop()
+ sys.path = self._old_sys_path
+ # To avoid Akka rebinding to the same port, since it doesn't unbind
+ # immediately on shutdown
+ self.sc._jvm.System.clearProperty("spark.driver.port")
+
+
+class TestCheckpoint(PySparkTestCase):
+
+ def setUp(self):
+ PySparkTestCase.setUp(self)
+ self.checkpointDir = NamedTemporaryFile(delete=False)
+ os.unlink(self.checkpointDir.name)
+ self.sc.setCheckpointDir(self.checkpointDir.name)
+
+ def tearDown(self):
+ PySparkTestCase.tearDown(self)
+ shutil.rmtree(self.checkpointDir.name)
+
+ def test_basic_checkpointing(self):
+ parCollection = self.sc.parallelize([1, 2, 3, 4])
+ flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
+
+ self.assertFalse(flatMappedRDD.isCheckpointed())
+ self.assertIsNone(flatMappedRDD.getCheckpointFile())
+
+ flatMappedRDD.checkpoint()
+ result = flatMappedRDD.collect()
+ time.sleep(1) # 1 second
+ self.assertTrue(flatMappedRDD.isCheckpointed())
+ self.assertEqual(flatMappedRDD.collect(), result)
+ self.assertEqual(self.checkpointDir.name,
+ os.path.dirname(flatMappedRDD.getCheckpointFile()))
+
+ def test_checkpoint_and_restore(self):
+ parCollection = self.sc.parallelize([1, 2, 3, 4])
+ flatMappedRDD = parCollection.flatMap(lambda x: [x])
+
+ self.assertFalse(flatMappedRDD.isCheckpointed())
+ self.assertIsNone(flatMappedRDD.getCheckpointFile())
+
+ flatMappedRDD.checkpoint()
+ flatMappedRDD.count() # forces a checkpoint to be computed
+ time.sleep(1) # 1 second
+
+ self.assertIsNotNone(flatMappedRDD.getCheckpointFile())
+ recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile())
+ self.assertEquals([1, 2, 3, 4], recovered.collect())
+
+
+class TestAddFile(PySparkTestCase):
+
+ def test_add_py_file(self):
+ # To ensure that we're actually testing addPyFile's effects, check that
+ # this job fails due to `userlibrary` not being on the Python path:
+ def func(x):
+ from userlibrary import UserClass
+ return UserClass().hello()
+ self.assertRaises(Exception,
+ self.sc.parallelize(range(2)).map(func).first)
+ # Add the file, so the job should now succeed:
+ path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
+ self.sc.addPyFile(path)
+ res = self.sc.parallelize(range(2)).map(func).first()
+ self.assertEqual("Hello World!", res)
+
+ def test_add_file_locally(self):
+ path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+ self.sc.addFile(path)
+ download_path = SparkFiles.get("hello.txt")
+ self.assertNotEqual(path, download_path)
+ with open(download_path) as test_file:
+ self.assertEquals("Hello World!\n", test_file.readline())
+
+ def test_add_py_file_locally(self):
+ # To ensure that we're actually testing addPyFile's effects, check that
+ # this fails due to `userlibrary` not being on the Python path:
+ def func():
+ from userlibrary import UserClass
+ self.assertRaises(ImportError, func)
+ path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
+ self.sc.addFile(path)
+ from userlibrary import UserClass
+ self.assertEqual("Hello World!", UserClass().hello())
+
+
+class TestIO(PySparkTestCase):
+
+ def test_stdout_redirection(self):
+ import subprocess
+ def func(x):
+ subprocess.check_call('ls', shell=True)
+ self.sc.parallelize([1]).foreach(func)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index b2b9288089..812e7a9da5 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1,20 +1,23 @@
"""
Worker that receives input from Piped RDD.
"""
+import os
import sys
+import traceback
from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
# copy_reg module.
from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
+from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, read_with_length, write_int, \
read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
# Redirect stdout to stderr so that users must return values from functions.
-old_stdout = sys.stdout
-sys.stdout = sys.stderr
+old_stdout = os.fdopen(os.dup(1), 'w')
+os.dup2(2, 1)
def load_obj():
@@ -23,6 +26,10 @@ def load_obj():
def main():
split_index = read_int(sys.stdin)
+ spark_files_dir = load_pickle(read_with_length(sys.stdin))
+ SparkFiles._root_directory = spark_files_dir
+ SparkFiles._is_running_on_worker = True
+ sys.path.append(spark_files_dir)
num_broadcast_variables = read_int(sys.stdin)
for _ in range(num_broadcast_variables):
bid = read_long(sys.stdin)
@@ -35,8 +42,13 @@ def main():
else:
dumps = dump_pickle
iterator = read_from_pickle_file(sys.stdin)
- for obj in func(split_index, iterator):
- write_with_length(dumps(obj), old_stdout)
+ try:
+ for obj in func(split_index, iterator):
+ write_with_length(dumps(obj), old_stdout)
+ except Exception as e:
+ write_int(-2, old_stdout)
+ write_with_length(traceback.format_exc(), old_stdout)
+ sys.exit(-1)
# Mark the beginning of the accumulators section of the output
write_int(-1, old_stdout)
for aid, accum in _accumulatorRegistry.items():