From d7d54a44e3ada0e50febe64e9b037dc2c8f6ff61 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 12 Nov 2014 15:58:12 -0800 Subject: [SPARK-2672] support compressed file in wholeTextFile The wholeFile() can not read compressed files, it should be, just like textFile(). Author: Davies Liu Closes #3005 from davies/whole and squashes the following commits: a43fcfb [Davies Liu] remove semicolon c83571a [Davies Liu] remove = if return type is Unit 83c844f [Davies Liu] Merge branch 'master' of github.com:apache/spark into whole 22e8b3e [Davies Liu] support compressed file in wholeTextFile --- .../spark/input/WholeTextFileInputFormat.scala | 20 ++++++--- .../spark/input/WholeTextFileRecordReader.scala | 52 ++++++++++++++++++++-- .../input/WholeTextFileRecordReaderSuite.scala | 44 ++++++++++++++++-- 3 files changed, 103 insertions(+), 13 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 183bce3d8d..d3601cca83 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -19,14 +19,13 @@ package org.apache.spark.input import scala.collection.JavaConversions._ +import org.apache.hadoop.conf.{Configuration, Configurable} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit /** * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for @@ -34,17 +33,24 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit * the value is the entire content of file. */ -private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] { +private[spark] class WholeTextFileInputFormat + extends CombineFileInputFormat[String, String] with Configurable { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf + override def createRecordReader( split: InputSplit, context: TaskAttemptContext): RecordReader[String, String] = { - new CombineFileRecordReader[String, String]( - split.asInstanceOf[CombineFileSplit], - context, - classOf[WholeTextFileRecordReader]) + val reader = new WholeCombineFileRecordReader(split, context) + reader.setConf(conf) + reader } /** diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 3564ab2e2a..6d59b24eb0 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -17,11 +17,13 @@ package org.apache.spark.input +import org.apache.hadoop.conf.{Configuration, Configurable} import com.google.common.io.{ByteStreams, Closeables} import org.apache.hadoop.io.Text +import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.mapreduce.InputSplit -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader} import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext @@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader( split: CombineFileSplit, context: TaskAttemptContext, index: Integer) - extends RecordReader[String, String] { + extends RecordReader[String, String] with Configurable { + + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf private[this] val path = split.getPath(index) private[this] val fs = path.getFileSystem(context.getConfiguration) @@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader( override def nextKeyValue(): Boolean = { if (!processed) { + val conf = new Configuration + val factory = new CompressionCodecFactory(conf) + val codec = factory.getCodec(path) // infers from file ext. val fileIn = fs.open(path) - val innerBuffer = ByteStreams.toByteArray(fileIn) + val innerBuffer = if (codec != null) { + ByteStreams.toByteArray(codec.createInputStream(fileIn)) + } else { + ByteStreams.toByteArray(fileIn) + } + value = new Text(innerBuffer).toString Closeables.close(fileIn, false) processed = true @@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader( } } } + + +/** + * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file + * out in a key-value pair, where the key is the file path and the value is the entire content of + * the file. + */ +private[spark] class WholeCombineFileRecordReader( + split: InputSplit, + context: TaskAttemptContext) + extends CombineFileRecordReader[String, String]( + split.asInstanceOf[CombineFileSplit], + context, + classOf[WholeTextFileRecordReader] + ) with Configurable { + + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf + + override def initNextRecordReader(): Boolean = { + val r = super.initNextRecordReader() + if (r) { + this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf) + } + r + } +} diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index 12d1c7b2fa..98b0a16ce8 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text import org.apache.spark.SparkContext import org.apache.spark.util.Utils +import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec} /** * Tests the correctness of @@ -38,20 +39,32 @@ import org.apache.spark.util.Utils */ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { private var sc: SparkContext = _ + private var factory: CompressionCodecFactory = _ override def beforeAll() { sc = new SparkContext("local", "test") // Set the block size of local file system to test whether files are split right or not. sc.hadoopConfiguration.setLong("fs.local.block.size", 32) + sc.hadoopConfiguration.set("io.compression.codecs", + "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec") + factory = new CompressionCodecFactory(sc.hadoopConfiguration) } override def afterAll() { sc.stop() } - private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = { - val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName")) + private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte], + compress: Boolean) = { + val out = if (compress) { + val codec = new GzipCodec + val path = s"${inputDir.toString}/$fileName${codec.getDefaultExtension}" + codec.createOutputStream(new DataOutputStream(new FileOutputStream(path))) + } else { + val path = s"${inputDir.toString}/$fileName" + new DataOutputStream(new FileOutputStream(path)) + } out.write(contents, 0, contents.length) out.close() } @@ -68,7 +81,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { println(s"Local disk address is ${dir.toString}.") WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) => - createNativeFile(dir, filename, contents) + createNativeFile(dir, filename, contents, false) } val res = sc.wholeTextFiles(dir.toString, 3).collect() @@ -86,6 +99,31 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { Utils.deleteRecursively(dir) } + + test("Correctness of WholeTextFileRecordReader with GzipCodec.") { + val dir = Utils.createTempDir() + println(s"Local disk address is ${dir.toString}.") + + WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) => + createNativeFile(dir, filename, contents, true) + } + + val res = sc.wholeTextFiles(dir.toString, 3).collect() + + assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size, + "Number of files read out does not fit with the actual value.") + + for ((filename, contents) <- res) { + val shortName = filename.split('/').last.split('.')(0) + + assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName), + s"Missing file name $filename.") + assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString, + s"file $filename contents can not match.") + } + + Utils.deleteRecursively(dir) + } } /** -- cgit v1.2.3