aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py16
-rw-r--r--python/pyspark/serializers.py18
2 files changed, 23 insertions, 11 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 84bc0a3b7c..3ab98e262d 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -331,12 +331,16 @@ class SparkContext(object):
return RDD(self._jsc.objectFile(name, minPartitions), self,
BatchedSerializer(PickleSerializer()))
- def textFile(self, name, minPartitions=None):
+ def textFile(self, name, minPartitions=None, use_unicode=True):
"""
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.
+ If use_unicode is False, the strings will be kept as `str` (encoding
+ as `utf-8`), which is faster and smaller than unicode. (Added in
+ Spark 1.2)
+
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... testFile.write("Hello world!")
@@ -346,9 +350,9 @@ class SparkContext(object):
"""
minPartitions = minPartitions or min(self.defaultParallelism, 2)
return RDD(self._jsc.textFile(name, minPartitions), self,
- UTF8Deserializer())
+ UTF8Deserializer(use_unicode))
- def wholeTextFiles(self, path, minPartitions=None):
+ def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
"""
Read a directory of text files from HDFS, a local file system
(available on all nodes), or any Hadoop-supported file system
@@ -356,6 +360,10 @@ class SparkContext(object):
key-value pair, where the key is the path of each file, the
value is the content of each file.
+ If use_unicode is False, the strings will be kept as `str` (encoding
+ as `utf-8`), which is faster and smaller than unicode. (Added in
+ Spark 1.2)
+
For example, if you have the following files::
hdfs://a-hdfs-path/part-00000
@@ -386,7 +394,7 @@ class SparkContext(object):
"""
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
- PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
+ PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))
def _dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 55e6cf3308..7b2710b913 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -429,18 +429,22 @@ class UTF8Deserializer(Serializer):
Deserializes streams written by String.getBytes.
"""
+ def __init__(self, use_unicode=False):
+ self.use_unicode = use_unicode
+
def loads(self, stream):
length = read_int(stream)
- return stream.read(length).decode('utf8')
+ s = stream.read(length)
+ return s.decode("utf-8") if self.use_unicode else s
def load_stream(self, stream):
- while True:
- try:
+ try:
+ while True:
yield self.loads(stream)
- except struct.error:
- return
- except EOFError:
- return
+ except struct.error:
+ return
+ except EOFError:
+ return
def read_long(stream):