diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-07-16 12:44:51 -0400 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-16 12:44:51 -0400 |
commit | efc452a16322e8b20b3c4fe1d6847315f928cd2d (patch) | |
tree | c12e6b96d5248d032d974874d13f8031927bc4d4 /sql/core | |
parent | 632fb3d9a9ebb3d2218385403145d5b89c41c025 (diff) | |
download | spark-efc452a16322e8b20b3c4fe1d6847315f928cd2d.tar.gz spark-efc452a16322e8b20b3c4fe1d6847315f928cd2d.tar.bz2 spark-efc452a16322e8b20b3c4fe1d6847315f928cd2d.zip |
[SPARK-2119][SQL] Improved Parquet performance when reading off S3
JIRA issue: [SPARK-2119](https://issues.apache.org/jira/browse/SPARK-2119)
Essentially this PR fixed three issues to gain much better performance when reading large Parquet file off S3.
1. When reading the schema, fetching Parquet metadata from a part-file rather than the `_metadata` file
The `_metadata` file contains metadata of all row groups, and can be very large if there are many row groups. Since schema information and row group metadata are coupled within a single Thrift object, we have to read the whole `_metadata` to fetch the schema. On the other hand, schema is replicated among footers of all part-files, which are fairly small.
1. Only add the root directory of the Parquet file rather than all the part-files to input paths
HDFS API can automatically filter out all hidden files and underscore files (`_SUCCESS` & `_metadata`), there's no need to filter out all part-files and add them individually to input paths. What make it much worse is that, `FileInputFormat.listStatus()` calls `FileSystem.globStatus()` on each individual input path sequentially, each results a blocking remote S3 HTTP request.
1. Worked around [PARQUET-16](https://issues.apache.org/jira/browse/PARQUET-16)
Essentially PARQUET-16 is similar to the above issue, and results lots of sequential `FileSystem.getFileStatus()` calls, which are further translated into a bunch of remote S3 HTTP requests.
`FilteringParquetRowInputFormat` should be cleaned up once PARQUET-16 is fixed.
Below is the micro benchmark result. The dataset used is a S3 Parquet file consists of 3,793 partitions, about 110MB per partition in average. The benchmark is done with a 9-node AWS cluster.
- Creating a Parquet `SchemaRDD` (Parquet schema is fetched)
```scala
val tweets = parquetFile(uri)
```
- Before: 17.80s
- After: 8.61s
- Fetching partition information
```scala
tweets.getPartitions
```
- Before: 700.87s
- After: 21.47s
- Counting the whole file (both steps above are executed altogether)
```scala
parquetFile(uri).count()
```
- Before: ??? (haven't test yet)
- After: 53.26s
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #1370 from liancheng/faster-parquet and squashes the following commits:
94a2821 [Cheng Lian] Added comments about schema consistency
d2c4417 [Cheng Lian] Worked around PARQUET-16 to improve Parquet performance
1c0d1b9 [Cheng Lian] Accelerated Parquet schema retrieving
5bd3d29 [Cheng Lian] Fixed Parquet log level
Diffstat (limited to 'sql/core')
3 files changed, 125 insertions, 50 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index ade823b51c..ea74320d06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -17,27 +17,34 @@ package org.apache.spark.sql.parquet +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.util.Try + import java.io.IOException +import java.lang.{Long => JLong} import java.text.SimpleDateFormat -import java.util.Date +import java.util.{Date, List => JList} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter -import parquet.hadoop.{ParquetRecordReader, ParquetInputFormat, ParquetOutputFormat} -import parquet.hadoop.api.ReadSupport +import parquet.hadoop._ +import parquet.hadoop.api.{InitContext, ReadSupport} +import parquet.hadoop.metadata.GlobalMetaData import parquet.hadoop.util.ContextUtil -import parquet.io.InvalidRecordException +import parquet.io.ParquetDecodingException import parquet.schema.MessageType -import org.apache.spark.{Logging, SerializableWritable, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} +import org.apache.spark.{Logging, SerializableWritable, TaskContext} /** * Parquet table scan operator. Imports the file that backs the given @@ -55,16 +62,14 @@ case class ParquetTableScan( override def execute(): RDD[Row] = { val sc = sqlContext.sparkContext val job = new Job(sc.hadoopConfiguration) - ParquetInputFormat.setReadSupportClass( - job, - classOf[org.apache.spark.sql.parquet.RowReadSupport]) + ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) + val conf: Configuration = ContextUtil.getConfiguration(job) - val fileList = FileSystemHelper.listFiles(relation.path, conf) - // add all paths in the directory but skip "hidden" ones such - // as "_SUCCESS" and "_metadata" - for (path <- fileList if !path.getName.startsWith("_")) { - NewFileInputFormat.addInputPath(job, path) + val qualifiedPath = { + val path = new Path(relation.path) + path.getFileSystem(conf).makeQualified(path) } + NewFileInputFormat.addInputPath(job, qualifiedPath) // Store both requested and original schema in `Configuration` conf.set( @@ -87,7 +92,7 @@ case class ParquetTableScan( sc.newAPIHadoopRDD( conf, - classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat], + classOf[FilteringParquetRowInputFormat], classOf[Void], classOf[Row]) .map(_._2) @@ -122,14 +127,7 @@ case class ParquetTableScan( private def validateProjection(projection: Seq[Attribute]): Boolean = { val original: MessageType = relation.parquetSchema val candidate: MessageType = ParquetTypesConverter.convertFromAttributes(projection) - try { - original.checkContains(candidate) - true - } catch { - case e: InvalidRecordException => { - false - } - } + Try(original.checkContains(candidate)).isSuccess } } @@ -302,6 +300,11 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) */ private[parquet] class FilteringParquetRowInputFormat extends parquet.hadoop.ParquetInputFormat[Row] with Logging { + + private var footers: JList[Footer] = _ + + private var fileStatuses= Map.empty[Path, FileStatus] + override def createRecordReader( inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = { @@ -318,6 +321,70 @@ private[parquet] class FilteringParquetRowInputFormat new ParquetRecordReader[Row](readSupport) } } + + override def getFooters(jobContext: JobContext): JList[Footer] = { + if (footers eq null) { + val statuses = listStatus(jobContext) + fileStatuses = statuses.map(file => file.getPath -> file).toMap + footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses) + } + + footers + } + + // TODO Remove this method and related code once PARQUET-16 is fixed + // This method together with the `getFooters` method and the `fileStatuses` field are just used + // to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17 + override def getSplits( + configuration: Configuration, + footers: JList[Footer]): JList[ParquetInputSplit] = { + + val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue) + val minSplitSize: JLong = + Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L)) + if (maxSplitSize < 0 || minSplitSize < 0) { + throw new ParquetDecodingException( + s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" + + s" minSplitSize = $minSplitSize") + } + + val getGlobalMetaData = + classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]]) + getGlobalMetaData.setAccessible(true) + val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData] + + val readContext = getReadSupport(configuration).init( + new InitContext(configuration, + globalMetaData.getKeyValueMetaData(), + globalMetaData.getSchema())) + + val generateSplits = + classOf[ParquetInputFormat[_]].getDeclaredMethods.find(_.getName == "generateSplits").get + generateSplits.setAccessible(true) + + val splits = mutable.ArrayBuffer.empty[ParquetInputSplit] + for (footer <- footers) { + val fs = footer.getFile.getFileSystem(configuration) + val file = footer.getFile + val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file)) + val parquetMetaData = footer.getParquetMetadata + val blocks = parquetMetaData.getBlocks + val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen) + splits.addAll( + generateSplits.invoke( + null, + blocks, + fileBlockLocations, + fileStatus, + parquetMetaData.getFileMetaData, + readContext.getRequestedSchema.toString, + readContext.getReadSupportMetadata, + minSplitSize, + maxSplitSize).asInstanceOf[JList[ParquetInputSplit]]) + } + + splits + } } private[parquet] object FileSystemHelper { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index f1953a008a..39294a3f4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -17,20 +17,19 @@ package org.apache.spark.sql.parquet -import org.apache.hadoop.conf.Configuration +import java.util.{HashMap => JHashMap} +import org.apache.hadoop.conf.Configuration import parquet.column.ParquetProperties import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.api.ReadSupport.ReadContext import parquet.hadoop.api.{ReadSupport, WriteSupport} import parquet.io.api._ -import parquet.schema.{MessageType, MessageTypeParser} +import parquet.schema.MessageType import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.SparkSqlSerializer -import com.google.common.io.BaseEncoding /** * A `parquet.io.api.RecordMaterializer` for Rows. @@ -93,8 +92,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { configuration: Configuration, keyValueMetaData: java.util.Map[String, String], fileSchema: MessageType): ReadContext = { - var parquetSchema: MessageType = fileSchema - var metadata: java.util.Map[String, String] = new java.util.HashMap[String, String]() + var parquetSchema = fileSchema + val metadata = new JHashMap[String, String]() val requestedAttributes = RowReadSupport.getRequestedSchema(configuration) if (requestedAttributes != null) { @@ -109,7 +108,7 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr) } - return new ReadSupport.ReadContext(parquetSchema, metadata) + new ReadSupport.ReadContext(parquetSchema, metadata) } } @@ -132,13 +131,17 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { private[parquet] var attributes: Seq[Attribute] = null override def init(configuration: Configuration): WriteSupport.WriteContext = { - attributes = if (attributes == null) RowWriteSupport.getSchema(configuration) else attributes - + val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) + val metadata = new JHashMap[String, String]() + metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr) + + if (attributes == null) { + attributes = ParquetTypesConverter.convertFromString(origAttributesStr) + } + log.debug(s"write support initialized for requested schema $attributes") ParquetRelation.enableLogForwarding() - new WriteSupport.WriteContext( - ParquetTypesConverter.convertFromAttributes(attributes), - new java.util.HashMap[java.lang.String, java.lang.String]()) + new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata) } override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 7f6ad908f7..58370b955a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -22,6 +22,7 @@ import java.io.IOException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter} import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData} @@ -367,20 +368,24 @@ private[parquet] object ParquetTypesConverter extends Logging { s"Expected $path for be a directory with Parquet files/metadata") } ParquetRelation.enableLogForwarding() - val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) - // if this is a new table that was just created we will find only the metadata file - if (fs.exists(metadataPath) && fs.isFile(metadataPath)) { - ParquetFileReader.readFooter(conf, metadataPath) - } else { - // there may be one or more Parquet files in the given directory - val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path)) - // TODO: for now we assume that all footers (if there is more than one) have identical - // metadata; we may want to add a check here at some point - if (footers.size() == 0) { - throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path") - } - footers(0).getParquetMetadata + + val children = fs.listStatus(path).filterNot { + _.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME } + + // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row + // groups. Since Parquet schema is replicated among all row groups, we only need to touch a + // single row group to read schema related metadata. Notice that we are making assumptions that + // all data in a single Parquet file have the same schema, which is normally true. + children + // Try any non-"_metadata" file first... + .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE) + // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is + // empty, thus normally the "_metadata" file is expected to be fairly small). + .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)) + .map(ParquetFileReader.readFooter(conf, _)) + .getOrElse( + throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")) } /** @@ -403,7 +408,7 @@ private[parquet] object ParquetTypesConverter extends Logging { } else { val attributes = convertToAttributes( readMetaData(origPath, conf).getFileMetaData.getSchema) - log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes") + log.info(s"Falling back to schema conversion from Parquet types; result: $attributes") attributes } } |