aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-12 15:58:12 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-12 15:58:12 -0800
commitd7d54a44e3ada0e50febe64e9b037dc2c8f6ff61 (patch)
tree1e9a98d73586d648b6c9ecc8a46df4cec3c36f49 /core/src
parentbd86118c4e980f94916f892c76fb808fd4c8bd85 (diff)
downloadspark-d7d54a44e3ada0e50febe64e9b037dc2c8f6ff61.tar.gz
spark-d7d54a44e3ada0e50febe64e9b037dc2c8f6ff61.tar.bz2
spark-d7d54a44e3ada0e50febe64e9b037dc2c8f6ff61.zip
[SPARK-2672] support compressed file in wholeTextFile
The wholeFile() can not read compressed files, it should be, just like textFile(). Author: Davies Liu <davies@databricks.com> Closes #3005 from davies/whole and squashes the following commits: a43fcfb [Davies Liu] remove semicolon c83571a [Davies Liu] remove = if return type is Unit 83c844f [Davies Liu] Merge branch 'master' of github.com:apache/spark into whole 22e8b3e [Davies Liu] support compressed file in wholeTextFile
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala52
-rw-r--r--core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala44
3 files changed, 103 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 183bce3d8d..d3601cca83 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -19,14 +19,13 @@ package org.apache.spark.input
import scala.collection.JavaConversions._
+import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
/**
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
@@ -34,17 +33,24 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
* the value is the entire content of file.
*/
-private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
+private[spark] class WholeTextFileInputFormat
+ extends CombineFileInputFormat[String, String] with Configurable {
+
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
+ private var conf: Configuration = _
+ def setConf(c: Configuration) {
+ conf = c
+ }
+ def getConf: Configuration = conf
+
override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {
- new CombineFileRecordReader[String, String](
- split.asInstanceOf[CombineFileSplit],
- context,
- classOf[WholeTextFileRecordReader])
+ val reader = new WholeCombineFileRecordReader(split, context)
+ reader.setConf(conf)
+ reader
}
/**
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 3564ab2e2a..6d59b24eb0 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -17,11 +17,13 @@
package org.apache.spark.input
+import org.apache.hadoop.conf.{Configuration, Configurable}
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
- extends RecordReader[String, String] {
+ extends RecordReader[String, String] with Configurable {
+
+ private var conf: Configuration = _
+ def setConf(c: Configuration) {
+ conf = c
+ }
+ def getConf: Configuration = conf
private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
@@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader(
override def nextKeyValue(): Boolean = {
if (!processed) {
+ val conf = new Configuration
+ val factory = new CompressionCodecFactory(conf)
+ val codec = factory.getCodec(path) // infers from file ext.
val fileIn = fs.open(path)
- val innerBuffer = ByteStreams.toByteArray(fileIn)
+ val innerBuffer = if (codec != null) {
+ ByteStreams.toByteArray(codec.createInputStream(fileIn))
+ } else {
+ ByteStreams.toByteArray(fileIn)
+ }
+
value = new Text(innerBuffer).toString
Closeables.close(fileIn, false)
processed = true
@@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader(
}
}
}
+
+
+/**
+ * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
+ * out in a key-value pair, where the key is the file path and the value is the entire content of
+ * the file.
+ */
+private[spark] class WholeCombineFileRecordReader(
+ split: InputSplit,
+ context: TaskAttemptContext)
+ extends CombineFileRecordReader[String, String](
+ split.asInstanceOf[CombineFileSplit],
+ context,
+ classOf[WholeTextFileRecordReader]
+ ) with Configurable {
+
+ private var conf: Configuration = _
+ def setConf(c: Configuration) {
+ conf = c
+ }
+ def getConf: Configuration = conf
+
+ override def initNextRecordReader(): Boolean = {
+ val r = super.initNextRecordReader()
+ if (r) {
+ this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
+ }
+ r
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index 12d1c7b2fa..98b0a16ce8 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
import org.apache.spark.SparkContext
import org.apache.spark.util.Utils
+import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec}
/**
* Tests the correctness of
@@ -38,20 +39,32 @@ import org.apache.spark.util.Utils
*/
class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
private var sc: SparkContext = _
+ private var factory: CompressionCodecFactory = _
override def beforeAll() {
sc = new SparkContext("local", "test")
// Set the block size of local file system to test whether files are split right or not.
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
+ sc.hadoopConfiguration.set("io.compression.codecs",
+ "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec")
+ factory = new CompressionCodecFactory(sc.hadoopConfiguration)
}
override def afterAll() {
sc.stop()
}
- private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
- val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
+ private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
+ compress: Boolean) = {
+ val out = if (compress) {
+ val codec = new GzipCodec
+ val path = s"${inputDir.toString}/$fileName${codec.getDefaultExtension}"
+ codec.createOutputStream(new DataOutputStream(new FileOutputStream(path)))
+ } else {
+ val path = s"${inputDir.toString}/$fileName"
+ new DataOutputStream(new FileOutputStream(path))
+ }
out.write(contents, 0, contents.length)
out.close()
}
@@ -68,7 +81,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
println(s"Local disk address is ${dir.toString}.")
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
- createNativeFile(dir, filename, contents)
+ createNativeFile(dir, filename, contents, false)
}
val res = sc.wholeTextFiles(dir.toString, 3).collect()
@@ -86,6 +99,31 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
Utils.deleteRecursively(dir)
}
+
+ test("Correctness of WholeTextFileRecordReader with GzipCodec.") {
+ val dir = Utils.createTempDir()
+ println(s"Local disk address is ${dir.toString}.")
+
+ WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
+ createNativeFile(dir, filename, contents, true)
+ }
+
+ val res = sc.wholeTextFiles(dir.toString, 3).collect()
+
+ assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
+ "Number of files read out does not fit with the actual value.")
+
+ for ((filename, contents) <- res) {
+ val shortName = filename.split('/').last.split('.')(0)
+
+ assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
+ s"Missing file name $filename.")
+ assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
+ s"file $filename contents can not match.")
+ }
+
+ Utils.deleteRecursively(dir)
+ }
}
/**