aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-12-16 11:19:36 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-16 11:19:36 -0800
commited362008f0a317729f8404e86e57d8a6ceb60f21 (patch)
tree48bb4d325c520b1190f078f5c3519ac5915dacb8 /core
parentc246b95dd2f565043db429c38c6cc029a0b870c1 (diff)
downloadspark-ed362008f0a317729f8404e86e57d8a6ceb60f21.tar.gz
spark-ed362008f0a317729f8404e86e57d8a6ceb60f21.tar.bz2
spark-ed362008f0a317729f8404e86e57d8a6ceb60f21.zip
[SPARK-4437] update doc for WholeCombineFileRecordReader
update doc for WholeCombineFileRecordReader Author: Davies Liu <davies@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #3301 from davies/fix_doc and squashes the following commits: 1d7422f [Davies Liu] Merge pull request #2 from JoshRosen/whole-text-file-cleanup dc3d21a [Josh Rosen] More genericization in ConfigurableCombineFileRecordReader. 95d13eb [Davies Liu] address comment bf800b9 [Davies Liu] update doc for WholeCombineFileRecordReader
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala43
2 files changed, 25 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index d3601cca83..aaef7c74ee 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -19,7 +19,6 @@ package org.apache.spark.input
import scala.collection.JavaConversions._
-import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
@@ -38,18 +37,13 @@ private[spark] class WholeTextFileInputFormat
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
- private var conf: Configuration = _
- def setConf(c: Configuration) {
- conf = c
- }
- def getConf: Configuration = conf
-
override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {
- val reader = new WholeCombineFileRecordReader(split, context)
- reader.setConf(conf)
+ val reader =
+ new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
+ reader.setConf(getConf)
reader
}
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 6d59b24eb0..1b1131b9b8 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -17,7 +17,7 @@
package org.apache.spark.input
-import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable}
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.io.Text
@@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
+ */
+private[spark] trait Configurable extends HConfigurable {
+ private var conf: Configuration = _
+ def setConf(c: Configuration) {
+ conf = c
+ }
+ def getConf: Configuration = conf
+}
+
/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
@@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader(
index: Integer)
extends RecordReader[String, String] with Configurable {
- private var conf: Configuration = _
- def setConf(c: Configuration) {
- conf = c
- }
- def getConf: Configuration = conf
-
private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
@@ -87,29 +93,24 @@ private[spark] class WholeTextFileRecordReader(
/**
- * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
- * out in a key-value pair, where the key is the file path and the value is the entire content of
- * the file.
+ * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
+ * that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]]
+ * RecordReaders.
*/
-private[spark] class WholeCombineFileRecordReader(
+private[spark] class ConfigurableCombineFileRecordReader[K, V](
split: InputSplit,
- context: TaskAttemptContext)
- extends CombineFileRecordReader[String, String](
+ context: TaskAttemptContext,
+ recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable])
+ extends CombineFileRecordReader[K, V](
split.asInstanceOf[CombineFileSplit],
context,
- classOf[WholeTextFileRecordReader]
+ recordReaderClass
) with Configurable {
- private var conf: Configuration = _
- def setConf(c: Configuration) {
- conf = c
- }
- def getConf: Configuration = conf
-
override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
- this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
+ this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
}
r
}