aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYash Datta <Yash.Datta@guavus.com>2015-06-12 13:44:09 +0800
committerCheng Lian <lian@databricks.com>2015-06-12 13:44:09 +0800
commite428b3a951377d47aa80d5f26d6bab979e72e8ab (patch)
tree2f6078891debd70f9d3c529f6b82b6b0fd3a3714 /sql
parent2dd7f93080ee882afcc2aac1a419802a19a668ce (diff)
downloadspark-e428b3a951377d47aa80d5f26d6bab979e72e8ab.tar.gz
spark-e428b3a951377d47aa80d5f26d6bab979e72e8ab.tar.bz2
spark-e428b3a951377d47aa80d5f26d6bab979e72e8ab.zip
[SPARK-6566] [SQL] Related changes for newer parquet version
This brings in major improvement in that footers are not read on the driver. This also cleans up the code in parquetTableOperations, where we had to override getSplits to eliminate multiple listStatus calls. cc liancheng are there any other changes we need for this ? Author: Yash Datta <Yash.Datta@guavus.com> Closes #5889 from saucam/parquet_1.6 and squashes the following commits: d1bf41e [Yash Datta] SPARK-7340: Fix scalastyle and incorporate review comments c9aa042 [Yash Datta] SPARK-7340: Use the new user defined filter predicate for pushing down inset into parquet 56bc750 [Yash Datta] SPARK-7340: Change parquet version to latest release
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala187
2 files changed, 44 insertions, 185 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 4d659f261a..d57b789f5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.parquet
+import java.io.Serializable
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding
@@ -24,7 +25,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.compat.FilterCompat._
import org.apache.parquet.filter2.predicate.FilterApi._
-import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
+import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Statistics}
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
import org.apache.spark.SparkEnv
@@ -42,6 +44,18 @@ private[sql] object ParquetFilters {
}.reduceOption(FilterApi.and).map(FilterCompat.get)
}
+ case class SetInFilter[T <: Comparable[T]](
+ valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
+
+ override def keep(value: T): Boolean = {
+ value != null && valueSet.contains(value)
+ }
+
+ override def canDrop(statistics: Statistics[T]): Boolean = false
+
+ override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
+ }
+
private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
@@ -154,6 +168,29 @@ private[sql] object ParquetFilters {
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
+ private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
+ case IntegerType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
+ case LongType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
+ case FloatType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
+ case DoubleType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
+ case StringType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(binaryColumn(n),
+ SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[UTF8String].getBytes))))
+ case BinaryType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(binaryColumn(n),
+ SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
+ }
+
/**
* Converts data sources filters to Parquet filter predicates.
*/
@@ -285,6 +322,9 @@ private[sql] object ParquetFilters {
case Not(pred) =>
createFilter(pred).map(FilterApi.not)
+ case InSet(NamedExpression(name, dataType), valueSet) =>
+ makeInSet.lift(dataType).map(_(name, valueSet))
+
case _ => None
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 1e694f2fea..272608d4e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -117,6 +117,9 @@ private[sql] case class ParquetTableScan(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
+ // Use task side metadata in parquet
+ conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
+
val baseRDD =
new org.apache.spark.rdd.NewHadoopRDD(
sc,
@@ -453,190 +456,6 @@ private[parquet] class FilteringParquetRowInputFormat
}
}
- // This is only a temporary solution sicne we need to use fileStatuses in
- // both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
- // two methods.
- override def getSplits(jobContext: JobContext): JList[InputSplit] = {
- // First set fileStatuses.
- val statuses = listStatus(jobContext)
- fileStatuses = statuses.map(file => file.getPath -> file).toMap
-
- super.getSplits(jobContext)
- }
-
- // TODO Remove this method and related code once PARQUET-16 is fixed
- // This method together with the `getFooters` method and the `fileStatuses` field are just used
- // to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17
- override def getSplits(
- configuration: Configuration,
- footers: JList[Footer]): JList[ParquetInputSplit] = {
-
- // Use task side strategy by default
- val taskSideMetaData = configuration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
- val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
- val minSplitSize: JLong =
- Math.max(getFormatMinSplitSize, configuration.getLong("mapred.min.split.size", 0L))
- if (maxSplitSize < 0 || minSplitSize < 0) {
- throw new ParquetDecodingException(
- s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" +
- s" minSplitSize = $minSplitSize")
- }
-
- // Uses strict type checking by default
- val getGlobalMetaData =
- classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
- getGlobalMetaData.setAccessible(true)
- var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
-
- if (globalMetaData == null) {
- val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
- return splits
- }
-
- val metadata = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
- val mergedMetadata = globalMetaData
- .getKeyValueMetaData
- .updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(metadata)))
-
- globalMetaData = new GlobalMetaData(globalMetaData.getSchema,
- mergedMetadata, globalMetaData.getCreatedBy)
-
- val readContext = ParquetInputFormat.getReadSupportInstance(configuration).init(
- new InitContext(configuration,
- globalMetaData.getKeyValueMetaData,
- globalMetaData.getSchema))
-
- if (taskSideMetaData){
- logInfo("Using Task Side Metadata Split Strategy")
- getTaskSideSplits(configuration,
- footers,
- maxSplitSize,
- minSplitSize,
- readContext)
- } else {
- logInfo("Using Client Side Metadata Split Strategy")
- getClientSideSplits(configuration,
- footers,
- maxSplitSize,
- minSplitSize,
- readContext)
- }
-
- }
-
- def getClientSideSplits(
- configuration: Configuration,
- footers: JList[Footer],
- maxSplitSize: JLong,
- minSplitSize: JLong,
- readContext: ReadContext): JList[ParquetInputSplit] = {
-
- import org.apache.parquet.filter2.compat.FilterCompat.Filter
- import org.apache.parquet.filter2.compat.RowGroupFilter
-
- import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache
-
- val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
-
- val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
- val filter: Filter = ParquetInputFormat.getFilter(configuration)
- var rowGroupsDropped: Long = 0
- var totalRowGroups: Long = 0
-
- // Ugly hack, stuck with it until PR:
- // https://github.com/apache/incubator-parquet-mr/pull/17
- // is resolved
- val generateSplits =
- Class.forName("org.apache.parquet.hadoop.ClientSideMetadataSplitStrategy")
- .getDeclaredMethods.find(_.getName == "generateSplits").getOrElse(
- sys.error(s"Failed to reflectively invoke ClientSideMetadataSplitStrategy.generateSplits"))
- generateSplits.setAccessible(true)
-
- for (footer <- footers) {
- val fs = footer.getFile.getFileSystem(configuration)
- val file = footer.getFile
- val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
- val parquetMetaData = footer.getParquetMetadata
- val blocks = parquetMetaData.getBlocks
- totalRowGroups = totalRowGroups + blocks.size
- val filteredBlocks = RowGroupFilter.filterRowGroups(
- filter,
- blocks,
- parquetMetaData.getFileMetaData.getSchema)
- rowGroupsDropped = rowGroupsDropped + (blocks.size - filteredBlocks.size)
-
- if (!filteredBlocks.isEmpty){
- var blockLocations: Array[BlockLocation] = null
- if (!cacheMetadata) {
- blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
- } else {
- blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
- def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
- })
- }
- splits.addAll(
- generateSplits.invoke(
- null,
- filteredBlocks,
- blockLocations,
- status,
- readContext.getRequestedSchema.toString,
- readContext.getReadSupportMetadata,
- minSplitSize,
- maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
- }
- }
-
- if (rowGroupsDropped > 0 && totalRowGroups > 0){
- val percentDropped = ((rowGroupsDropped/totalRowGroups.toDouble) * 100).toInt
- logInfo(s"Dropping $rowGroupsDropped row groups that do not pass filter predicate "
- + s"($percentDropped %) !")
- }
- else {
- logInfo("There were no row groups that could be dropped due to filter predicates")
- }
- splits
-
- }
-
- def getTaskSideSplits(
- configuration: Configuration,
- footers: JList[Footer],
- maxSplitSize: JLong,
- minSplitSize: JLong,
- readContext: ReadContext): JList[ParquetInputSplit] = {
-
- val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
-
- // Ugly hack, stuck with it until PR:
- // https://github.com/apache/incubator-parquet-mr/pull/17
- // is resolved
- val generateSplits =
- Class.forName("org.apache.parquet.hadoop.TaskSideMetadataSplitStrategy")
- .getDeclaredMethods.find(_.getName == "generateTaskSideMDSplits").getOrElse(
- sys.error(
- s"Failed to reflectively invoke TaskSideMetadataSplitStrategy.generateTaskSideMDSplits"))
- generateSplits.setAccessible(true)
-
- for (footer <- footers) {
- val file = footer.getFile
- val fs = file.getFileSystem(configuration)
- val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
- val blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
- splits.addAll(
- generateSplits.invoke(
- null,
- blockLocations,
- status,
- readContext.getRequestedSchema.toString,
- readContext.getReadSupportMetadata,
- minSplitSize,
- maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
- }
-
- splits
- }
-
}
private[parquet] object FilteringParquetRowInputFormat {