aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorXusen Yin <yinxusen@gmail.com>2014-04-13 13:18:52 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-13 13:18:52 -0700
commit037fe4d2ba01be5610baa3dd9c5c9d3a5e5e1064 (patch)
tree1149963f7478c685c18696af118cf73e7562beb5 /core
parent4bc07eebbf5e2ea0c0b6f1642049515025d88d07 (diff)
downloadspark-037fe4d2ba01be5610baa3dd9c5c9d3a5e5e1064.tar.gz
spark-037fe4d2ba01be5610baa3dd9c5c9d3a5e5e1064.tar.bz2
spark-037fe4d2ba01be5610baa3dd9c5c9d3a5e5e1064.zip
[SPARK-1415] Hadoop min split for wholeTextFiles()
JIRA issue [here](https://issues.apache.org/jira/browse/SPARK-1415). New Hadoop API of `InputFormat` does not provide the `minSplits` parameter, which makes the API incompatible between `HadoopRDD` and `NewHadoopRDD`. The PR is for constructing compatible APIs. Though `minSplits` is deprecated by New Hadoop API, we think it is better to make APIs compatible here. **Note** that `minSplits` in `wholeTextFiles` could only be treated as a *suggestion*, the real number of splits may not be greater than `minSplits` due to `isSplitable()=false`. Author: Xusen Yin <yinxusen@gmail.com> Closes #376 from yinxusen/hadoop-min-split and squashes the following commits: 76417f6 [Xusen Yin] refine comments c10af60 [Xusen Yin] refine comments and rewrite new class for wholeTextFile 766d05b [Xusen Yin] refine Java API and comments 4875755 [Xusen Yin] add minSplits for WholeTextFiles
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala60
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala2
6 files changed, 90 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5a36e6f5c1..456070fa7c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -454,14 +454,21 @@ class SparkContext(config: SparkConf) extends Logging {
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
- * @note Small files are preferred, as each file will be loaded fully in memory.
+ * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+ *
+ * @param minSplits A suggestion value of the minimal splitting number for input data.
*/
- def wholeTextFiles(path: String): RDD[(String, String)] = {
- newAPIHadoopFile(
- path,
+ def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
+ val job = new NewHadoopJob(hadoopConfiguration)
+ NewFileInputFormat.addInputPath(job, new Path(path))
+ val updateConf = job.getConfiguration
+ new WholeTextFileRDD(
+ this,
classOf[WholeTextFileInputFormat],
classOf[String],
- classOf[String])
+ classOf[String],
+ updateConf,
+ minSplits)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 1e8242a2cb..7fbefe1cb0 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -177,7 +177,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
- * @note Small files are preferred, as each file will be loaded fully in memory.
+ * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+ *
+ * @param minSplits A suggestion value of the minimal splitting number for input data.
+ */
+ def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
+ new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
+
+ /**
+ * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
+ * Hadoop-supported file system URI. Each file is read as a single record and returned in a
+ * key-value pair, where the key is the path of each file, the value is the content of each file.
+ *
+ * @see `wholeTextFiles(path: String, minSplits: Int)`.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 4887fb6b84..80d055a895 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -17,6 +17,8 @@
package org.apache.spark.input
+import scala.collection.JavaConversions._
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
@@ -44,4 +46,16 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
context,
classOf[WholeTextFileRecordReader])
}
+
+ /**
+ * Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
+ */
+ def setMaxSplitSize(context: JobContext, minSplits: Int) {
+ val files = listStatus(context)
+ val totalLen = files.map { file =>
+ if (file.isDir) 0L else file.getLen
+ }.sum
+ val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
+ super.setMaxSplitSize(maxSplitSize)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2d8dfa5a16..8684b645bc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -24,11 +24,18 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
-import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi
-
-private[spark]
-class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
+import org.apache.spark.input.WholeTextFileInputFormat
+import org.apache.spark.InterruptibleIterator
+import org.apache.spark.Logging
+import org.apache.spark.Partition
+import org.apache.spark.SerializableWritable
+import org.apache.spark.{SparkContext, TaskContext}
+
+private[spark] class NewHadoopPartition(
+ rddId: Int,
+ val index: Int,
+ @transient rawSplit: InputSplit with Writable)
extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -65,17 +72,19 @@ class NewHadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
- private val jobtrackerId: String = {
+ private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
formatter.format(new Date())
}
- @transient private val jobId = new JobID(jobtrackerId, id)
+ @transient protected val jobId = new JobID(jobTrackerId, id)
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
- if (inputFormat.isInstanceOf[Configurable]) {
- inputFormat.asInstanceOf[Configurable].setConf(conf)
+ inputFormat match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
}
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
@@ -91,11 +100,13 @@ class NewHadoopRDD[K, V](
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
- val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0)
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
- if (format.isInstanceOf[Configurable]) {
- format.asInstanceOf[Configurable].setConf(conf)
+ format match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
}
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
@@ -141,3 +152,30 @@ class NewHadoopRDD[K, V](
def getConf: Configuration = confBroadcast.value.value
}
+private[spark] class WholeTextFileRDD(
+ sc : SparkContext,
+ inputFormatClass: Class[_ <: WholeTextFileInputFormat],
+ keyClass: Class[String],
+ valueClass: Class[String],
+ @transient conf: Configuration,
+ minSplits: Int)
+ extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
+
+ override def getPartitions: Array[Partition] = {
+ val inputFormat = inputFormatClass.newInstance
+ inputFormat match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
+ }
+ val jobContext = newJobContext(conf, jobId)
+ inputFormat.setMaxSplitSize(jobContext, minSplits)
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val result = new Array[Partition](rawSplits.size)
+ for (i <- 0 until rawSplits.size) {
+ result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ result
+ }
+}
+
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index ab2fdac553..8d2e9f1846 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -626,7 +626,7 @@ public class JavaAPISuite implements Serializable {
container.put(tempDirName+"/part-00000", new Text(content1).toString());
container.put(tempDirName+"/part-00001", new Text(content2).toString());
- JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName);
+ JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
List<Tuple2<String, String>> result = readRDD.collect();
for (Tuple2<String, String> res : result) {
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index e89b296d41..33d6de9a76 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -73,7 +73,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
createNativeFile(dir, filename, contents)
}
- val res = sc.wholeTextFiles(dir.toString).collect()
+ val res = sc.wholeTextFiles(dir.toString, 3).collect()
assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
"Number of files read out does not fit with the actual value.")