diff options
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala | 42 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala | 187 |
2 files changed, 44 insertions, 185 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 4d659f261a..d57b789f5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.parquet +import java.io.Serializable import java.nio.ByteBuffer import com.google.common.io.BaseEncoding @@ -24,7 +25,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.compat.FilterCompat._ import org.apache.parquet.filter2.predicate.FilterApi._ -import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} +import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Statistics} +import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary import org.apache.spark.SparkEnv @@ -42,6 +44,18 @@ private[sql] object ParquetFilters { }.reduceOption(FilterApi.and).map(FilterCompat.get) } + case class SetInFilter[T <: Comparable[T]]( + valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable { + + override def keep(value: T): Boolean = { + value != null && valueSet.contains(value) + } + + override def canDrop(statistics: Statistics[T]): Boolean = false + + override def inverseCanDrop(statistics: Statistics[T]): Boolean = false + } + private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { case BooleanType => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) @@ -154,6 +168,29 @@ private[sql] object ParquetFilters { FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) } + private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { + case IntegerType => + (n: String, v: Set[Any]) => + FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]])) + case LongType => + (n: String, v: Set[Any]) => + FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]])) + case FloatType => + (n: String, v: Set[Any]) => + FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]])) + case DoubleType => + (n: String, v: Set[Any]) => + FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) + case StringType => + (n: String, v: Set[Any]) => + FilterApi.userDefined(binaryColumn(n), + SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[UTF8String].getBytes)))) + case BinaryType => + (n: String, v: Set[Any]) => + FilterApi.userDefined(binaryColumn(n), + SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) + } + /** * Converts data sources filters to Parquet filter predicates. */ @@ -285,6 +322,9 @@ private[sql] object ParquetFilters { case Not(pred) => createFilter(pred).map(FilterApi.not) + case InSet(NamedExpression(name, dataType), valueSet) => + makeInSet.lift(dataType).map(_(name, valueSet)) + case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 1e694f2fea..272608d4e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -117,6 +117,9 @@ private[sql] case class ParquetTableScan( SQLConf.PARQUET_CACHE_METADATA, sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true")) + // Use task side metadata in parquet + conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true); + val baseRDD = new org.apache.spark.rdd.NewHadoopRDD( sc, @@ -453,190 +456,6 @@ private[parquet] class FilteringParquetRowInputFormat } } - // This is only a temporary solution sicne we need to use fileStatuses in - // both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these - // two methods. - override def getSplits(jobContext: JobContext): JList[InputSplit] = { - // First set fileStatuses. - val statuses = listStatus(jobContext) - fileStatuses = statuses.map(file => file.getPath -> file).toMap - - super.getSplits(jobContext) - } - - // TODO Remove this method and related code once PARQUET-16 is fixed - // This method together with the `getFooters` method and the `fileStatuses` field are just used - // to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17 - override def getSplits( - configuration: Configuration, - footers: JList[Footer]): JList[ParquetInputSplit] = { - - // Use task side strategy by default - val taskSideMetaData = configuration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) - val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue) - val minSplitSize: JLong = - Math.max(getFormatMinSplitSize, configuration.getLong("mapred.min.split.size", 0L)) - if (maxSplitSize < 0 || minSplitSize < 0) { - throw new ParquetDecodingException( - s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" + - s" minSplitSize = $minSplitSize") - } - - // Uses strict type checking by default - val getGlobalMetaData = - classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]]) - getGlobalMetaData.setAccessible(true) - var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData] - - if (globalMetaData == null) { - val splits = mutable.ArrayBuffer.empty[ParquetInputSplit] - return splits - } - - val metadata = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) - val mergedMetadata = globalMetaData - .getKeyValueMetaData - .updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(metadata))) - - globalMetaData = new GlobalMetaData(globalMetaData.getSchema, - mergedMetadata, globalMetaData.getCreatedBy) - - val readContext = ParquetInputFormat.getReadSupportInstance(configuration).init( - new InitContext(configuration, - globalMetaData.getKeyValueMetaData, - globalMetaData.getSchema)) - - if (taskSideMetaData){ - logInfo("Using Task Side Metadata Split Strategy") - getTaskSideSplits(configuration, - footers, - maxSplitSize, - minSplitSize, - readContext) - } else { - logInfo("Using Client Side Metadata Split Strategy") - getClientSideSplits(configuration, - footers, - maxSplitSize, - minSplitSize, - readContext) - } - - } - - def getClientSideSplits( - configuration: Configuration, - footers: JList[Footer], - maxSplitSize: JLong, - minSplitSize: JLong, - readContext: ReadContext): JList[ParquetInputSplit] = { - - import org.apache.parquet.filter2.compat.FilterCompat.Filter - import org.apache.parquet.filter2.compat.RowGroupFilter - - import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache - - val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true) - - val splits = mutable.ArrayBuffer.empty[ParquetInputSplit] - val filter: Filter = ParquetInputFormat.getFilter(configuration) - var rowGroupsDropped: Long = 0 - var totalRowGroups: Long = 0 - - // Ugly hack, stuck with it until PR: - // https://github.com/apache/incubator-parquet-mr/pull/17 - // is resolved - val generateSplits = - Class.forName("org.apache.parquet.hadoop.ClientSideMetadataSplitStrategy") - .getDeclaredMethods.find(_.getName == "generateSplits").getOrElse( - sys.error(s"Failed to reflectively invoke ClientSideMetadataSplitStrategy.generateSplits")) - generateSplits.setAccessible(true) - - for (footer <- footers) { - val fs = footer.getFile.getFileSystem(configuration) - val file = footer.getFile - val status = fileStatuses.getOrElse(file, fs.getFileStatus(file)) - val parquetMetaData = footer.getParquetMetadata - val blocks = parquetMetaData.getBlocks - totalRowGroups = totalRowGroups + blocks.size - val filteredBlocks = RowGroupFilter.filterRowGroups( - filter, - blocks, - parquetMetaData.getFileMetaData.getSchema) - rowGroupsDropped = rowGroupsDropped + (blocks.size - filteredBlocks.size) - - if (!filteredBlocks.isEmpty){ - var blockLocations: Array[BlockLocation] = null - if (!cacheMetadata) { - blockLocations = fs.getFileBlockLocations(status, 0, status.getLen) - } else { - blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] { - def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen) - }) - } - splits.addAll( - generateSplits.invoke( - null, - filteredBlocks, - blockLocations, - status, - readContext.getRequestedSchema.toString, - readContext.getReadSupportMetadata, - minSplitSize, - maxSplitSize).asInstanceOf[JList[ParquetInputSplit]]) - } - } - - if (rowGroupsDropped > 0 && totalRowGroups > 0){ - val percentDropped = ((rowGroupsDropped/totalRowGroups.toDouble) * 100).toInt - logInfo(s"Dropping $rowGroupsDropped row groups that do not pass filter predicate " - + s"($percentDropped %) !") - } - else { - logInfo("There were no row groups that could be dropped due to filter predicates") - } - splits - - } - - def getTaskSideSplits( - configuration: Configuration, - footers: JList[Footer], - maxSplitSize: JLong, - minSplitSize: JLong, - readContext: ReadContext): JList[ParquetInputSplit] = { - - val splits = mutable.ArrayBuffer.empty[ParquetInputSplit] - - // Ugly hack, stuck with it until PR: - // https://github.com/apache/incubator-parquet-mr/pull/17 - // is resolved - val generateSplits = - Class.forName("org.apache.parquet.hadoop.TaskSideMetadataSplitStrategy") - .getDeclaredMethods.find(_.getName == "generateTaskSideMDSplits").getOrElse( - sys.error( - s"Failed to reflectively invoke TaskSideMetadataSplitStrategy.generateTaskSideMDSplits")) - generateSplits.setAccessible(true) - - for (footer <- footers) { - val file = footer.getFile - val fs = file.getFileSystem(configuration) - val status = fileStatuses.getOrElse(file, fs.getFileStatus(file)) - val blockLocations = fs.getFileBlockLocations(status, 0, status.getLen) - splits.addAll( - generateSplits.invoke( - null, - blockLocations, - status, - readContext.getRequestedSchema.toString, - readContext.getReadSupportMetadata, - minSplitSize, - maxSplitSize).asInstanceOf[JList[ParquetInputSplit]]) - } - - splits - } - } private[parquet] object FilteringParquetRowInputFormat { |