aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala52
2 files changed, 62 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 183bce3d8d..d3601cca83 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -19,14 +19,13 @@ package org.apache.spark.input
import scala.collection.JavaConversions._
+import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
/**
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
@@ -34,17 +33,24 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
* the value is the entire content of file.
*/
-private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
+private[spark] class WholeTextFileInputFormat
+ extends CombineFileInputFormat[String, String] with Configurable {
+
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
+ private var conf: Configuration = _
+ def setConf(c: Configuration) {
+ conf = c
+ }
+ def getConf: Configuration = conf
+
override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {
- new CombineFileRecordReader[String, String](
- split.asInstanceOf[CombineFileSplit],
- context,
- classOf[WholeTextFileRecordReader])
+ val reader = new WholeCombineFileRecordReader(split, context)
+ reader.setConf(conf)
+ reader
}
/**
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 3564ab2e2a..6d59b24eb0 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -17,11 +17,13 @@
package org.apache.spark.input
+import org.apache.hadoop.conf.{Configuration, Configurable}
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
- extends RecordReader[String, String] {
+ extends RecordReader[String, String] with Configurable {
+
+ private var conf: Configuration = _
+ def setConf(c: Configuration) {
+ conf = c
+ }
+ def getConf: Configuration = conf
private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
@@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader(
override def nextKeyValue(): Boolean = {
if (!processed) {
+ val conf = new Configuration
+ val factory = new CompressionCodecFactory(conf)
+ val codec = factory.getCodec(path) // infers from file ext.
val fileIn = fs.open(path)
- val innerBuffer = ByteStreams.toByteArray(fileIn)
+ val innerBuffer = if (codec != null) {
+ ByteStreams.toByteArray(codec.createInputStream(fileIn))
+ } else {
+ ByteStreams.toByteArray(fileIn)
+ }
+
value = new Text(innerBuffer).toString
Closeables.close(fileIn, false)
processed = true
@@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader(
}
}
}
+
+
+/**
+ * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
+ * out in a key-value pair, where the key is the file path and the value is the entire content of
+ * the file.
+ */
+private[spark] class WholeCombineFileRecordReader(
+ split: InputSplit,
+ context: TaskAttemptContext)
+ extends CombineFileRecordReader[String, String](
+ split.asInstanceOf[CombineFileSplit],
+ context,
+ classOf[WholeTextFileRecordReader]
+ ) with Configurable {
+
+ private var conf: Configuration = _
+ def setConf(c: Configuration) {
+ conf = c
+ }
+ def getConf: Configuration = conf
+
+ override def initNextRecordReader(): Boolean = {
+ val r = super.initNextRecordReader()
+ if (r) {
+ this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
+ }
+ r
+ }
+}