aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-07-16 12:44:51 -0400
committerMichael Armbrust <michael@databricks.com>2014-07-16 12:44:51 -0400
commitefc452a16322e8b20b3c4fe1d6847315f928cd2d (patch)
treec12e6b96d5248d032d974874d13f8031927bc4d4 /sql
parent632fb3d9a9ebb3d2218385403145d5b89c41c025 (diff)
downloadspark-efc452a16322e8b20b3c4fe1d6847315f928cd2d.tar.gz
spark-efc452a16322e8b20b3c4fe1d6847315f928cd2d.tar.bz2
spark-efc452a16322e8b20b3c4fe1d6847315f928cd2d.zip
[SPARK-2119][SQL] Improved Parquet performance when reading off S3
JIRA issue: [SPARK-2119](https://issues.apache.org/jira/browse/SPARK-2119) Essentially this PR fixed three issues to gain much better performance when reading large Parquet file off S3. 1. When reading the schema, fetching Parquet metadata from a part-file rather than the `_metadata` file The `_metadata` file contains metadata of all row groups, and can be very large if there are many row groups. Since schema information and row group metadata are coupled within a single Thrift object, we have to read the whole `_metadata` to fetch the schema. On the other hand, schema is replicated among footers of all part-files, which are fairly small. 1. Only add the root directory of the Parquet file rather than all the part-files to input paths HDFS API can automatically filter out all hidden files and underscore files (`_SUCCESS` & `_metadata`), there's no need to filter out all part-files and add them individually to input paths. What make it much worse is that, `FileInputFormat.listStatus()` calls `FileSystem.globStatus()` on each individual input path sequentially, each results a blocking remote S3 HTTP request. 1. Worked around [PARQUET-16](https://issues.apache.org/jira/browse/PARQUET-16) Essentially PARQUET-16 is similar to the above issue, and results lots of sequential `FileSystem.getFileStatus()` calls, which are further translated into a bunch of remote S3 HTTP requests. `FilteringParquetRowInputFormat` should be cleaned up once PARQUET-16 is fixed. Below is the micro benchmark result. The dataset used is a S3 Parquet file consists of 3,793 partitions, about 110MB per partition in average. The benchmark is done with a 9-node AWS cluster. - Creating a Parquet `SchemaRDD` (Parquet schema is fetched) ```scala val tweets = parquetFile(uri) ``` - Before: 17.80s - After: 8.61s - Fetching partition information ```scala tweets.getPartitions ``` - Before: 700.87s - After: 21.47s - Counting the whole file (both steps above are executed altogether) ```scala parquetFile(uri).count() ``` - Before: ??? (haven't test yet) - After: 53.26s Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1370 from liancheng/faster-parquet and squashes the following commits: 94a2821 [Cheng Lian] Added comments about schema consistency d2c4417 [Cheng Lian] Worked around PARQUET-16 to improve Parquet performance 1c0d1b9 [Cheng Lian] Accelerated Parquet schema retrieving 5bd3d29 [Cheng Lian] Fixed Parquet log level
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala115
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala33
3 files changed, 125 insertions, 50 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index ade823b51c..ea74320d06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -17,27 +17,34 @@
package org.apache.spark.sql.parquet
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.Try
+
import java.io.IOException
+import java.lang.{Long => JLong}
import java.text.SimpleDateFormat
-import java.util.Date
+import java.util.{Date, List => JList}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-import parquet.hadoop.{ParquetRecordReader, ParquetInputFormat, ParquetOutputFormat}
-import parquet.hadoop.api.ReadSupport
+import parquet.hadoop._
+import parquet.hadoop.api.{InitContext, ReadSupport}
+import parquet.hadoop.metadata.GlobalMetaData
import parquet.hadoop.util.ContextUtil
-import parquet.io.InvalidRecordException
+import parquet.io.ParquetDecodingException
import parquet.schema.MessageType
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
+import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
* Parquet table scan operator. Imports the file that backs the given
@@ -55,16 +62,14 @@ case class ParquetTableScan(
override def execute(): RDD[Row] = {
val sc = sqlContext.sparkContext
val job = new Job(sc.hadoopConfiguration)
- ParquetInputFormat.setReadSupportClass(
- job,
- classOf[org.apache.spark.sql.parquet.RowReadSupport])
+ ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+
val conf: Configuration = ContextUtil.getConfiguration(job)
- val fileList = FileSystemHelper.listFiles(relation.path, conf)
- // add all paths in the directory but skip "hidden" ones such
- // as "_SUCCESS" and "_metadata"
- for (path <- fileList if !path.getName.startsWith("_")) {
- NewFileInputFormat.addInputPath(job, path)
+ val qualifiedPath = {
+ val path = new Path(relation.path)
+ path.getFileSystem(conf).makeQualified(path)
}
+ NewFileInputFormat.addInputPath(job, qualifiedPath)
// Store both requested and original schema in `Configuration`
conf.set(
@@ -87,7 +92,7 @@ case class ParquetTableScan(
sc.newAPIHadoopRDD(
conf,
- classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat],
+ classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row])
.map(_._2)
@@ -122,14 +127,7 @@ case class ParquetTableScan(
private def validateProjection(projection: Seq[Attribute]): Boolean = {
val original: MessageType = relation.parquetSchema
val candidate: MessageType = ParquetTypesConverter.convertFromAttributes(projection)
- try {
- original.checkContains(candidate)
- true
- } catch {
- case e: InvalidRecordException => {
- false
- }
- }
+ Try(original.checkContains(candidate)).isSuccess
}
}
@@ -302,6 +300,11 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
*/
private[parquet] class FilteringParquetRowInputFormat
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
+
+ private var footers: JList[Footer] = _
+
+ private var fileStatuses= Map.empty[Path, FileStatus]
+
override def createRecordReader(
inputSplit: InputSplit,
taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = {
@@ -318,6 +321,70 @@ private[parquet] class FilteringParquetRowInputFormat
new ParquetRecordReader[Row](readSupport)
}
}
+
+ override def getFooters(jobContext: JobContext): JList[Footer] = {
+ if (footers eq null) {
+ val statuses = listStatus(jobContext)
+ fileStatuses = statuses.map(file => file.getPath -> file).toMap
+ footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
+ }
+
+ footers
+ }
+
+ // TODO Remove this method and related code once PARQUET-16 is fixed
+ // This method together with the `getFooters` method and the `fileStatuses` field are just used
+ // to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17
+ override def getSplits(
+ configuration: Configuration,
+ footers: JList[Footer]): JList[ParquetInputSplit] = {
+
+ val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
+ val minSplitSize: JLong =
+ Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
+ if (maxSplitSize < 0 || minSplitSize < 0) {
+ throw new ParquetDecodingException(
+ s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" +
+ s" minSplitSize = $minSplitSize")
+ }
+
+ val getGlobalMetaData =
+ classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
+ getGlobalMetaData.setAccessible(true)
+ val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
+
+ val readContext = getReadSupport(configuration).init(
+ new InitContext(configuration,
+ globalMetaData.getKeyValueMetaData(),
+ globalMetaData.getSchema()))
+
+ val generateSplits =
+ classOf[ParquetInputFormat[_]].getDeclaredMethods.find(_.getName == "generateSplits").get
+ generateSplits.setAccessible(true)
+
+ val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
+ for (footer <- footers) {
+ val fs = footer.getFile.getFileSystem(configuration)
+ val file = footer.getFile
+ val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
+ val parquetMetaData = footer.getParquetMetadata
+ val blocks = parquetMetaData.getBlocks
+ val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
+ splits.addAll(
+ generateSplits.invoke(
+ null,
+ blocks,
+ fileBlockLocations,
+ fileStatus,
+ parquetMetaData.getFileMetaData,
+ readContext.getRequestedSchema.toString,
+ readContext.getReadSupportMetadata,
+ minSplitSize,
+ maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
+ }
+
+ splits
+ }
}
private[parquet] object FileSystemHelper {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index f1953a008a..39294a3f4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -17,20 +17,19 @@
package org.apache.spark.sql.parquet
-import org.apache.hadoop.conf.Configuration
+import java.util.{HashMap => JHashMap}
+import org.apache.hadoop.conf.Configuration
import parquet.column.ParquetProperties
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.api.ReadSupport.ReadContext
import parquet.hadoop.api.{ReadSupport, WriteSupport}
import parquet.io.api._
-import parquet.schema.{MessageType, MessageTypeParser}
+import parquet.schema.MessageType
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.execution.SparkSqlSerializer
-import com.google.common.io.BaseEncoding
/**
* A `parquet.io.api.RecordMaterializer` for Rows.
@@ -93,8 +92,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
configuration: Configuration,
keyValueMetaData: java.util.Map[String, String],
fileSchema: MessageType): ReadContext = {
- var parquetSchema: MessageType = fileSchema
- var metadata: java.util.Map[String, String] = new java.util.HashMap[String, String]()
+ var parquetSchema = fileSchema
+ val metadata = new JHashMap[String, String]()
val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
if (requestedAttributes != null) {
@@ -109,7 +108,7 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
}
- return new ReadSupport.ReadContext(parquetSchema, metadata)
+ new ReadSupport.ReadContext(parquetSchema, metadata)
}
}
@@ -132,13 +131,17 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
private[parquet] var attributes: Seq[Attribute] = null
override def init(configuration: Configuration): WriteSupport.WriteContext = {
- attributes = if (attributes == null) RowWriteSupport.getSchema(configuration) else attributes
-
+ val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
+ val metadata = new JHashMap[String, String]()
+ metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
+
+ if (attributes == null) {
+ attributes = ParquetTypesConverter.convertFromString(origAttributesStr)
+ }
+
log.debug(s"write support initialized for requested schema $attributes")
ParquetRelation.enableLogForwarding()
- new WriteSupport.WriteContext(
- ParquetTypesConverter.convertFromAttributes(attributes),
- new java.util.HashMap[java.lang.String, java.lang.String]())
+ new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
}
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 7f6ad908f7..58370b955a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -22,6 +22,7 @@ import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
@@ -367,20 +368,24 @@ private[parquet] object ParquetTypesConverter extends Logging {
s"Expected $path for be a directory with Parquet files/metadata")
}
ParquetRelation.enableLogForwarding()
- val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
- // if this is a new table that was just created we will find only the metadata file
- if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
- ParquetFileReader.readFooter(conf, metadataPath)
- } else {
- // there may be one or more Parquet files in the given directory
- val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
- // TODO: for now we assume that all footers (if there is more than one) have identical
- // metadata; we may want to add a check here at some point
- if (footers.size() == 0) {
- throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")
- }
- footers(0).getParquetMetadata
+
+ val children = fs.listStatus(path).filterNot {
+ _.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME
}
+
+ // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
+ // groups. Since Parquet schema is replicated among all row groups, we only need to touch a
+ // single row group to read schema related metadata. Notice that we are making assumptions that
+ // all data in a single Parquet file have the same schema, which is normally true.
+ children
+ // Try any non-"_metadata" file first...
+ .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
+ // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is
+ // empty, thus normally the "_metadata" file is expected to be fairly small).
+ .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE))
+ .map(ParquetFileReader.readFooter(conf, _))
+ .getOrElse(
+ throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
}
/**
@@ -403,7 +408,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
} else {
val attributes = convertToAttributes(
readMetaData(origPath, conf).getFileMetaData.getSchema)
- log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes")
+ log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
attributes
}
}