diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-21 16:42:24 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-21 17:34:17 -0800 |
commit | ef711902c1f42db14c8ddd524195f0a9efb56e65 (patch) | |
tree | e770a7439d3983c13346cbd81aa1eeeef23e2571 /python/pyspark/context.py | |
parent | 506077c9938cd411842fe42404aa6b74b45b23a1 (diff) | |
download | spark-ef711902c1f42db14c8ddd524195f0a9efb56e65.tar.gz spark-ef711902c1f42db14c8ddd524195f0a9efb56e65.tar.bz2 spark-ef711902c1f42db14c8ddd524195f0a9efb56e65.zip |
Don't download files to master's working directory.
This should avoid exceptions caused by existing
files with different contents.
I also removed some unused code.
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r-- | python/pyspark/context.py | 40 |
1 files changed, 36 insertions, 4 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index dcbed37270..ec0cc7c2f9 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1,5 +1,7 @@ import os import atexit +import shutil +import tempfile from tempfile import NamedTemporaryFile from pyspark import accumulators @@ -173,10 +175,26 @@ class SparkContext(object): def addFile(self, path): """ - Add a file to be downloaded into the working directory of this Spark - job on every node. The C{path} passed can be either a local file, - a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, - HTTPS or FTP URI. + Add a file to be downloaded with this Spark job on every node. + The C{path} passed can be either a local file, a file in HDFS + (or other Hadoop-supported filesystems), or an HTTP, HTTPS or + FTP URI. + + To access the file in Spark jobs, use + L{SparkFiles.get(path)<pyspark.files.SparkFiles.get>} to find its + download location. + + >>> from pyspark import SparkFiles + >>> path = os.path.join(tempdir, "test.txt") + >>> with open(path, "w") as testFile: + ... testFile.write("100") + >>> sc.addFile(path) + >>> def func(iterator): + ... with open(SparkFiles.get("test.txt")) as testFile: + ... fileVal = int(testFile.readline()) + ... return [x * 100 for x in iterator] + >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() + [100, 200, 300, 400] """ self._jsc.sc().addFile(path) @@ -211,3 +229,17 @@ class SparkContext(object): accidental overriding of checkpoint files in the existing directory. """ self._jsc.sc().setCheckpointDir(dirName, useExisting) + + +def _test(): + import doctest + globs = globals().copy() + globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) + globs['tempdir'] = tempfile.mkdtemp() + atexit.register(lambda: shutil.rmtree(globs['tempdir'])) + doctest.testmod(globs=globs) + globs['sc'].stop() + + +if __name__ == "__main__": + _test() |