aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-08-18 11:00:10 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-18 11:00:10 -0700
commit9eb74c7d2cbe127dd4c32bf1a8318497b2fb55b6 (patch)
tree85a180feecc5b4770933a470a0444ae16127ad8e /sql
parent6bca8898a1aa4ca7161492229bac1748b3da2ad7 (diff)
downloadspark-9eb74c7d2cbe127dd4c32bf1a8318497b2fb55b6.tar.gz
spark-9eb74c7d2cbe127dd4c32bf1a8318497b2fb55b6.tar.bz2
spark-9eb74c7d2cbe127dd4c32bf1a8318497b2fb55b6.zip
[SPARK-3091] [SQL] Add support for caching metadata on Parquet files
For larger Parquet files, reading the file footers (which is done in parallel on up to 5 threads) and HDFS block locations (which is serial) can take multiple seconds. We can add an option to cache this data within FilteringParquetInputFormat. Unfortunately ParquetInputFormat only caches footers within each instance of ParquetInputFormat, not across them. Note: this PR leaves this turned off by default for 1.1, but I believe it's safe to turn it on after. The keys in the hash maps are FileStatus objects that include a modification time, so this will work fine if files are modified. The location cache could become invalid if files have moved within HDFS, but that's rare so I just made it invalidate entries every 15 minutes. Author: Matei Zaharia <matei@databricks.com> Closes #2005 from mateiz/parquet-cache and squashes the following commits: dae8efe [Matei Zaharia] Bug fix c71e9ed [Matei Zaharia] Handle empty statuses directly 22072b0 [Matei Zaharia] Use Guava caches and add a config option for caching metadata 8fb56ce [Matei Zaharia] Cache file block locations too 453bd21 [Matei Zaharia] Bug fix 4094df6 [Matei Zaharia] First attempt at caching Parquet footers
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala84
2 files changed, 72 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 56face2992..4f2adb006f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -32,6 +32,7 @@ private[spark] object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
+ val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 759a2a586b..c6dca10f6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -17,22 +17,23 @@
package org.apache.spark.sql.parquet
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.Try
-
import java.io.IOException
import java.lang.{Long => JLong}
import java.text.SimpleDateFormat
-import java.util.{Date, List => JList}
+import java.util.concurrent.{Callable, TimeUnit}
+import java.util.{ArrayList, Collections, Date, List => JList}
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.Try
+
+import com.google.common.cache.CacheBuilder
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
@@ -41,7 +42,7 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
@@ -96,6 +97,11 @@ case class ParquetTableScan(
ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
}
+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+ conf.set(
+ SQLConf.PARQUET_CACHE_METADATA,
+ sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
+
sc.newAPIHadoopRDD(
conf,
classOf[FilteringParquetRowInputFormat],
@@ -323,10 +329,40 @@ private[parquet] class FilteringParquetRowInputFormat
}
override def getFooters(jobContext: JobContext): JList[Footer] = {
+ import FilteringParquetRowInputFormat.footerCache
+
if (footers eq null) {
+ val conf = ContextUtil.getConfiguration(jobContext)
+ val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
- footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
+ if (statuses.isEmpty) {
+ footers = Collections.emptyList[Footer]
+ } else if (!cacheMetadata) {
+ // Read the footers from HDFS
+ footers = getFooters(conf, statuses)
+ } else {
+ // Read only the footers that are not in the footerCache
+ val foundFooters = footerCache.getAllPresent(statuses)
+ val toFetch = new ArrayList[FileStatus]
+ for (s <- statuses) {
+ if (!foundFooters.containsKey(s)) {
+ toFetch.add(s)
+ }
+ }
+ val newFooters = new mutable.HashMap[FileStatus, Footer]
+ if (toFetch.size > 0) {
+ val fetched = getFooters(conf, toFetch)
+ for ((status, i) <- toFetch.zipWithIndex) {
+ newFooters(status) = fetched.get(i)
+ }
+ footerCache.putAll(newFooters)
+ }
+ footers = new ArrayList[Footer](statuses.size)
+ for (status <- statuses) {
+ footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
+ }
+ }
}
footers
@@ -339,6 +375,10 @@ private[parquet] class FilteringParquetRowInputFormat
configuration: Configuration,
footers: JList[Footer]): JList[ParquetInputSplit] = {
+ import FilteringParquetRowInputFormat.blockLocationCache
+
+ val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
+
val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
val minSplitSize: JLong =
Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
@@ -366,16 +406,23 @@ private[parquet] class FilteringParquetRowInputFormat
for (footer <- footers) {
val fs = footer.getFile.getFileSystem(configuration)
val file = footer.getFile
- val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
+ val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val parquetMetaData = footer.getParquetMetadata
val blocks = parquetMetaData.getBlocks
- val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
+ var blockLocations: Array[BlockLocation] = null
+ if (!cacheMetadata) {
+ blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
+ } else {
+ blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
+ def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
+ })
+ }
splits.addAll(
generateSplits.invoke(
null,
blocks,
- fileBlockLocations,
- fileStatus,
+ blockLocations,
+ status,
parquetMetaData.getFileMetaData,
readContext.getRequestedSchema.toString,
readContext.getReadSupportMetadata,
@@ -387,6 +434,17 @@ private[parquet] class FilteringParquetRowInputFormat
}
}
+private[parquet] object FilteringParquetRowInputFormat {
+ private val footerCache = CacheBuilder.newBuilder()
+ .maximumSize(20000)
+ .build[FileStatus, Footer]()
+
+ private val blockLocationCache = CacheBuilder.newBuilder()
+ .maximumSize(20000)
+ .expireAfterWrite(15, TimeUnit.MINUTES) // Expire locations since HDFS files might move
+ .build[FileStatus, Array[BlockLocation]]()
+}
+
private[parquet] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)