aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-04-18 10:01:16 -0700
committerReynold Xin <rxin@apache.org>2014-04-18 10:01:16 -0700
commite31c8ffca65e0e5cd5f1a6229f3d654a24b7b18c (patch)
treeb0923d192066b8f44bad5047f0ca03719af5c789 /core
parent7863ecca35be9af1eca0dfe5fd8806c5dd710fd6 (diff)
downloadspark-e31c8ffca65e0e5cd5f1a6229f3d654a24b7b18c.tar.gz
spark-e31c8ffca65e0e5cd5f1a6229f3d654a24b7b18c.tar.bz2
spark-e31c8ffca65e0e5cd5f1a6229f3d654a24b7b18c.zip
SPARK-1483: Rename minSplits to minPartitions in public APIs
https://issues.apache.org/jira/browse/SPARK-1483 From the original JIRA: " The parameter name is part of the public API in Scala and Python, since you can pass named parameters to a method, so we should name it to this more descriptive term. Everywhere else we refer to "splits" as partitions." - @mateiz Author: CodingCat <zhunansjtu@gmail.com> Closes #430 from CodingCat/SPARK-1483 and squashes the following commits: 4b60541 [CodingCat] deprecate defaultMinSplits ba2c663 [CodingCat] Rename minSplits to minPartitions in public APIs
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala47
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
5 files changed, 60 insertions, 45 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3ddc0d5eee..ee5637371f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -427,9 +427,9 @@ class SparkContext(config: SparkConf) extends Logging {
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
- def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
+ def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
- minSplits).map(pair => pair._2.toString)
+ minPartitions).map(pair => pair._2.toString)
}
/**
@@ -457,9 +457,10 @@ class SparkContext(config: SparkConf) extends Logging {
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
- * @param minSplits A suggestion value of the minimal splitting number for input data.
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
- def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
+ def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
+ RDD[(String, String)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
@@ -469,7 +470,7 @@ class SparkContext(config: SparkConf) extends Logging {
classOf[String],
classOf[String],
updateConf,
- minSplits)
+ minPartitions)
}
/**
@@ -481,7 +482,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
- * @param minSplits Minimum number of Hadoop Splits to generate.
+ * @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
@@ -493,11 +494,11 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -524,7 +525,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minPartitions)
}
/**
@@ -532,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging {
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
- * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
+ * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions)
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
@@ -541,13 +542,13 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
- (path: String, minSplits: Int)
+ (path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
- minSplits)
+ minPartitions)
}
/**
@@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
- hadoopFile[K, V, F](path, defaultMinSplits)
+ hadoopFile[K, V, F](path, defaultMinPartitions)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
@@ -626,10 +627,10 @@ class SparkContext(config: SparkConf) extends Logging {
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minPartitions: Int
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
+ hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
@@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging {
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
): RDD[(K, V)] =
- sequenceFile(path, keyClass, valueClass, defaultMinSplits)
+ sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def sequenceFile[K, V]
- (path: String, minSplits: Int = defaultMinSplits)
+ (path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
@@ -674,7 +675,7 @@ class SparkContext(config: SparkConf) extends Logging {
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
+ vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
@@ -688,9 +689,9 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def objectFile[T: ClassTag](
path: String,
- minSplits: Int = defaultMinSplits
+ minPartitions: Int = defaultMinPartitions
): RDD[T] = {
- sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
+ sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
}
@@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging {
def defaultParallelism: Int = taskScheduler.defaultParallelism
/** Default min number of partitions for Hadoop RDDs when not given by user */
+ @deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
+ /** Default min number of partitions for Hadoop RDDs when not given by user */
+ def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
+
private val nextShuffleId = new AtomicInteger(0)
private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e6a3f06b0e..cf30523ab5 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -109,9 +109,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: java.lang.Integer = sc.defaultParallelism
- /** Default min number of partitions for Hadoop RDDs when not given by user */
+ /**
+ * Default min number of partitions for Hadoop RDDs when not given by user.
+ * @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use
+ * {@link #defaultMinPartitions()} instead
+ */
+ @Deprecated
def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits
+ /** Default min number of partitions for Hadoop RDDs when not given by user */
+ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
+
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
@@ -153,7 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
- def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
+ def textFile(path: String, minPartitions: Int): JavaRDD[String] =
+ sc.textFile(path, minPartitions)
/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
@@ -180,17 +189,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
- * @param minSplits A suggestion value of the minimal splitting number for input data.
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
- def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
- new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
+ def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] =
+ new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))
/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
- * @see `wholeTextFiles(path: String, minSplits: Int)`.
+ * @see `wholeTextFiles(path: String, minPartitions: Int)`.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))
@@ -205,11 +214,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
+ new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions))
}
/** Get an RDD for a Hadoop SequenceFile.
@@ -233,9 +242,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* slow if you use the default serializer (Java serialization), though the nice thing about it is
* that there's very little effort required to save arbitrary objects.
*/
- def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
+ def objectFile[T](path: String, minPartitions: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
- sc.objectFile(path, minSplits)(ctag)
+ sc.objectFile(path, minPartitions)(ctag)
}
/**
@@ -265,11 +274,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
inputFormatClass: Class[F],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
+ new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
}
/**
@@ -304,11 +313,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
inputFormatClass: Class[F],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
+ new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 80d055a895..4cb4505777 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -48,14 +48,15 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
}
/**
- * Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
+ * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
*/
- def setMaxSplitSize(context: JobContext, minSplits: Int) {
+ def setMaxSplitSize(context: JobContext, minPartitions: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
}.sum
- val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
+ val maxSplitSize = Math.ceil(totalLen * 1.0 /
+ (if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 6811e1abb8..6547755764 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -87,7 +87,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
- * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
+ * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate.
*/
@DeveloperApi
class HadoopRDD[K, V](
@@ -97,7 +97,7 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int)
+ minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
def this(
@@ -106,7 +106,7 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int) = {
+ minPartitions: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
@@ -115,7 +115,7 @@ class HadoopRDD[K, V](
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minPartitions)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -169,7 +169,7 @@ class HadoopRDD[K, V](
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
- val inputSplits = inputFormat.getSplits(jobConf, minSplits)
+ val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 8684b645bc..ac1ccc06f2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -158,7 +158,7 @@ private[spark] class WholeTextFileRDD(
keyClass: Class[String],
valueClass: Class[String],
@transient conf: Configuration,
- minSplits: Int)
+ minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
override def getPartitions: Array[Partition] = {
@@ -169,7 +169,7 @@ private[spark] class WholeTextFileRDD(
case _ =>
}
val jobContext = newJobContext(conf, jobId)
- inputFormat.setMaxSplitSize(jobContext, minSplits)
+ inputFormat.setMaxSplitSize(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {