diff options
author | Kan Zhang <kzhang@apache.org> | 2014-05-21 13:26:53 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-05-21 13:26:53 -0700 |
commit | f18fd05b513b136363c94adb3e5b841f8bf48134 (patch) | |
tree | 62d204b2b94e911c474c945204eba0fb9eaf0ec8 /python | |
parent | ba5d4a99425a2083fea2a9759050c5e770197e23 (diff) | |
download | spark-f18fd05b513b136363c94adb3e5b841f8bf48134.tar.gz spark-f18fd05b513b136363c94adb3e5b841f8bf48134.tar.bz2 spark-f18fd05b513b136363c94adb3e5b841f8bf48134.zip |
[SPARK-1519] Support minPartitions param of wholeTextFiles() in PySpark
Author: Kan Zhang <kzhang@apache.org>
Closes #697 from kanzhang/SPARK-1519 and squashes the following commits:
4f8d1ed [Kan Zhang] [SPARK-1519] Support minPartitions param of wholeTextFiles() in PySpark
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/context.py | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index cac133d0fc..c9ff82d23b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -211,6 +211,13 @@ class SparkContext(object): """ return self._jsc.sc().defaultParallelism() + @property + def defaultMinPartitions(self): + """ + Default min number of partitions for Hadoop RDDs when not given by user + """ + return self._jsc.sc().defaultMinPartitions() + def __del__(self): self.stop() @@ -264,7 +271,7 @@ class SparkContext(object): return RDD(self._jsc.textFile(name, minPartitions), self, UTF8Deserializer()) - def wholeTextFiles(self, path): + def wholeTextFiles(self, path, minPartitions=None): """ Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system @@ -300,7 +307,8 @@ class SparkContext(object): >>> sorted(textFiles.collect()) [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')] """ - return RDD(self._jsc.wholeTextFiles(path), self, + minPartitions = minPartitions or self.defaultMinPartitions + return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) def _checkpointFile(self, name, input_deserializer): |