diff options
author | Matei Zaharia <matei@databricks.com> | 2014-04-04 17:29:29 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-04-04 17:29:29 -0700 |
commit | 60e18ce7dd1016647b63586520b713efc45494a8 (patch) | |
tree | 3f7a5bbb130855a849cbdf1af9a2259ba3699006 /python | |
parent | d956cc251676d67d87bd6dbfa82be864933d8136 (diff) | |
download | spark-60e18ce7dd1016647b63586520b713efc45494a8.tar.gz spark-60e18ce7dd1016647b63586520b713efc45494a8.tar.bz2 spark-60e18ce7dd1016647b63586520b713efc45494a8.zip |
SPARK-1414. Python API for SparkContext.wholeTextFiles
Also clarified comment on each file having to fit in memory
Author: Matei Zaharia <matei@databricks.com>
Closes #327 from mateiz/py-whole-files and squashes the following commits:
9ad64a5 [Matei Zaharia] SPARK-1414. Python API for SparkContext.wholeTextFiles
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/context.py | 44 | ||||
-rw-r--r-- | python/pyspark/serializers.py | 2 |
2 files changed, 43 insertions, 3 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index bf2454fd7e..ff1023bbfa 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -28,7 +28,8 @@ from pyspark.broadcast import Broadcast from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway -from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer +from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ + PairDeserializer from pyspark.storagelevel import StorageLevel from pyspark import rdd from pyspark.rdd import RDD @@ -257,6 +258,45 @@ class SparkContext(object): return RDD(self._jsc.textFile(name, minSplits), self, UTF8Deserializer()) + def wholeTextFiles(self, path): + """ + Read a directory of text files from HDFS, a local file system + (available on all nodes), or any Hadoop-supported file system + URI. Each file is read as a single record and returned in a + key-value pair, where the key is the path of each file, the + value is the content of each file. + + For example, if you have the following files:: + + hdfs://a-hdfs-path/part-00000 + hdfs://a-hdfs-path/part-00001 + ... + hdfs://a-hdfs-path/part-nnnnn + + Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")}, + then C{rdd} contains:: + + (a-hdfs-path/part-00000, its content) + (a-hdfs-path/part-00001, its content) + ... + (a-hdfs-path/part-nnnnn, its content) + + NOTE: Small files are preferred, as each file will be loaded + fully in memory. + + >>> dirPath = os.path.join(tempdir, "files") + >>> os.mkdir(dirPath) + >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1: + ... file1.write("1") + >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2: + ... file2.write("2") + >>> textFiles = sc.wholeTextFiles(dirPath) + >>> sorted(textFiles.collect()) + [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')] + """ + return RDD(self._jsc.wholeTextFiles(path), self, + PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) + def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) @@ -425,7 +465,7 @@ def _test(): globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() atexit.register(lambda: shutil.rmtree(globs['tempdir'])) - (failure_count, test_count) = doctest.testmod(globs=globs) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 12c63f186a..4d802924df 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -290,7 +290,7 @@ class MarshalSerializer(FramedSerializer): class UTF8Deserializer(Serializer): """ - Deserializes streams written by getBytes. + Deserializes streams written by String.getBytes. """ def loads(self, stream): |