aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-04-04 17:29:29 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-04 17:29:29 -0700
commit60e18ce7dd1016647b63586520b713efc45494a8 (patch)
tree3f7a5bbb130855a849cbdf1af9a2259ba3699006 /python
parentd956cc251676d67d87bd6dbfa82be864933d8136 (diff)
downloadspark-60e18ce7dd1016647b63586520b713efc45494a8.tar.gz
spark-60e18ce7dd1016647b63586520b713efc45494a8.tar.bz2
spark-60e18ce7dd1016647b63586520b713efc45494a8.zip
SPARK-1414. Python API for SparkContext.wholeTextFiles
Also clarified comment on each file having to fit in memory Author: Matei Zaharia <matei@databricks.com> Closes #327 from mateiz/py-whole-files and squashes the following commits: 9ad64a5 [Matei Zaharia] SPARK-1414. Python API for SparkContext.wholeTextFiles
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py44
-rw-r--r--python/pyspark/serializers.py2
2 files changed, 43 insertions, 3 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index bf2454fd7e..ff1023bbfa 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -28,7 +28,8 @@ from pyspark.broadcast import Broadcast
from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
-from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
+from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
+ PairDeserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
@@ -257,6 +258,45 @@ class SparkContext(object):
return RDD(self._jsc.textFile(name, minSplits), self,
UTF8Deserializer())
+ def wholeTextFiles(self, path):
+ """
+ Read a directory of text files from HDFS, a local file system
+ (available on all nodes), or any Hadoop-supported file system
+ URI. Each file is read as a single record and returned in a
+ key-value pair, where the key is the path of each file, the
+ value is the content of each file.
+
+ For example, if you have the following files::
+
+ hdfs://a-hdfs-path/part-00000
+ hdfs://a-hdfs-path/part-00001
+ ...
+ hdfs://a-hdfs-path/part-nnnnn
+
+ Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")},
+ then C{rdd} contains::
+
+ (a-hdfs-path/part-00000, its content)
+ (a-hdfs-path/part-00001, its content)
+ ...
+ (a-hdfs-path/part-nnnnn, its content)
+
+ NOTE: Small files are preferred, as each file will be loaded
+ fully in memory.
+
+ >>> dirPath = os.path.join(tempdir, "files")
+ >>> os.mkdir(dirPath)
+ >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
+ ... file1.write("1")
+ >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
+ ... file2.write("2")
+ >>> textFiles = sc.wholeTextFiles(dirPath)
+ >>> sorted(textFiles.collect())
+ [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
+ """
+ return RDD(self._jsc.wholeTextFiles(path), self,
+ PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
+
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
@@ -425,7 +465,7 @@ def _test():
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
- (failure_count, test_count) = doctest.testmod(globs=globs)
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 12c63f186a..4d802924df 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -290,7 +290,7 @@ class MarshalSerializer(FramedSerializer):
class UTF8Deserializer(Serializer):
"""
- Deserializes streams written by getBytes.
+ Deserializes streams written by String.getBytes.
"""
def loads(self, stream):