aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala187
2 files changed, 44 insertions, 185 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 4d659f261a..d57b789f5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.parquet
+import java.io.Serializable
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding
@@ -24,7 +25,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.compat.FilterCompat._
import org.apache.parquet.filter2.predicate.FilterApi._
-import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
+import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Statistics}
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
import org.apache.spark.SparkEnv
@@ -42,6 +44,18 @@ private[sql] object ParquetFilters {
}.reduceOption(FilterApi.and).map(FilterCompat.get)
}
+ case class SetInFilter[T <: Comparable[T]](
+ valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
+
+ override def keep(value: T): Boolean = {
+ value != null && valueSet.contains(value)
+ }
+
+ override def canDrop(statistics: Statistics[T]): Boolean = false
+
+ override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
+ }
+
private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
@@ -154,6 +168,29 @@ private[sql] object ParquetFilters {
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}
+ private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
+ case IntegerType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
+ case LongType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
+ case FloatType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
+ case DoubleType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
+ case StringType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(binaryColumn(n),
+ SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[UTF8String].getBytes))))
+ case BinaryType =>
+ (n: String, v: Set[Any]) =>
+ FilterApi.userDefined(binaryColumn(n),
+ SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
+ }
+
/**
* Converts data sources filters to Parquet filter predicates.
*/
@@ -285,6 +322,9 @@ private[sql] object ParquetFilters {
case Not(pred) =>
createFilter(pred).map(FilterApi.not)
+ case InSet(NamedExpression(name, dataType), valueSet) =>
+ makeInSet.lift(dataType).map(_(name, valueSet))
+
case _ => None
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 1e694f2fea..272608d4e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -117,6 +117,9 @@ private[sql] case class ParquetTableScan(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
+ // Use task side metadata in parquet
+ conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
+
val baseRDD =
new org.apache.spark.rdd.NewHadoopRDD(
sc,
@@ -453,190 +456,6 @@ private[parquet] class FilteringParquetRowInputFormat
}
}
- // This is only a temporary solution sicne we need to use fileStatuses in
- // both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
- // two methods.
- override def getSplits(jobContext: JobContext): JList[InputSplit] = {
- // First set fileStatuses.
- val statuses = listStatus(jobContext)
- fileStatuses = statuses.map(file => file.getPath -> file).toMap
-
- super.getSplits(jobContext)
- }
-
- // TODO Remove this method and related code once PARQUET-16 is fixed
- // This method together with the `getFooters` method and the `fileStatuses` field are just used
- // to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17
- override def getSplits(
- configuration: Configuration,
- footers: JList[Footer]): JList[ParquetInputSplit] = {
-
- // Use task side strategy by default
- val taskSideMetaData = configuration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
- val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
- val minSplitSize: JLong =
- Math.max(getFormatMinSplitSize, configuration.getLong("mapred.min.split.size", 0L))
- if (maxSplitSize < 0 || minSplitSize < 0) {
- throw new ParquetDecodingException(
- s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" +
- s" minSplitSize = $minSplitSize")
- }
-
- // Uses strict type checking by default
- val getGlobalMetaData =
- classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
- getGlobalMetaData.setAccessible(true)
- var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
-
- if (globalMetaData == null) {
- val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
- return splits
- }
-
- val metadata = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
- val mergedMetadata = globalMetaData
- .getKeyValueMetaData
- .updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(metadata)))
-
- globalMetaData = new GlobalMetaData(globalMetaData.getSchema,
- mergedMetadata, globalMetaData.getCreatedBy)
-
- val readContext = ParquetInputFormat.getReadSupportInstance(configuration).init(
- new InitContext(configuration,
- globalMetaData.getKeyValueMetaData,
- globalMetaData.getSchema))
-
- if (taskSideMetaData){
- logInfo("Using Task Side Metadata Split Strategy")
- getTaskSideSplits(configuration,
- footers,
- maxSplitSize,
- minSplitSize,
- readContext)
- } else {
- logInfo("Using Client Side Metadata Split Strategy")
- getClientSideSplits(configuration,
- footers,
- maxSplitSize,
- minSplitSize,
- readContext)
- }
-
- }
-
- def getClientSideSplits(
- configuration: Configuration,
- footers: JList[Footer],
- maxSplitSize: JLong,
- minSplitSize: JLong,
- readContext: ReadContext): JList[ParquetInputSplit] = {
-
- import org.apache.parquet.filter2.compat.FilterCompat.Filter
- import org.apache.parquet.filter2.compat.RowGroupFilter
-
- import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache
-
- val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
-
- val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
- val filter: Filter = ParquetInputFormat.getFilter(configuration)
- var rowGroupsDropped: Long = 0
- var totalRowGroups: Long = 0
-
- // Ugly hack, stuck with it until PR:
- // https://github.com/apache/incubator-parquet-mr/pull/17
- // is resolved
- val generateSplits =
- Class.forName("org.apache.parquet.hadoop.ClientSideMetadataSplitStrategy")
- .getDeclaredMethods.find(_.getName == "generateSplits").getOrElse(
- sys.error(s"Failed to reflectively invoke ClientSideMetadataSplitStrategy.generateSplits"))
- generateSplits.setAccessible(true)
-
- for (footer <- footers) {
- val fs = footer.getFile.getFileSystem(configuration)
- val file = footer.getFile
- val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
- val parquetMetaData = footer.getParquetMetadata
- val blocks = parquetMetaData.getBlocks
- totalRowGroups = totalRowGroups + blocks.size
- val filteredBlocks = RowGroupFilter.filterRowGroups(
- filter,
- blocks,
- parquetMetaData.getFileMetaData.getSchema)
- rowGroupsDropped = rowGroupsDropped + (blocks.size - filteredBlocks.size)
-
- if (!filteredBlocks.isEmpty){
- var blockLocations: Array[BlockLocation] = null
- if (!cacheMetadata) {
- blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
- } else {
- blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
- def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
- })
- }
- splits.addAll(
- generateSplits.invoke(
- null,
- filteredBlocks,
- blockLocations,
- status,
- readContext.getRequestedSchema.toString,
- readContext.getReadSupportMetadata,
- minSplitSize,
- maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
- }
- }
-
- if (rowGroupsDropped > 0 && totalRowGroups > 0){
- val percentDropped = ((rowGroupsDropped/totalRowGroups.toDouble) * 100).toInt
- logInfo(s"Dropping $rowGroupsDropped row groups that do not pass filter predicate "
- + s"($percentDropped %) !")
- }
- else {
- logInfo("There were no row groups that could be dropped due to filter predicates")
- }
- splits
-
- }
-
- def getTaskSideSplits(
- configuration: Configuration,
- footers: JList[Footer],
- maxSplitSize: JLong,
- minSplitSize: JLong,
- readContext: ReadContext): JList[ParquetInputSplit] = {
-
- val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
-
- // Ugly hack, stuck with it until PR:
- // https://github.com/apache/incubator-parquet-mr/pull/17
- // is resolved
- val generateSplits =
- Class.forName("org.apache.parquet.hadoop.TaskSideMetadataSplitStrategy")
- .getDeclaredMethods.find(_.getName == "generateTaskSideMDSplits").getOrElse(
- sys.error(
- s"Failed to reflectively invoke TaskSideMetadataSplitStrategy.generateTaskSideMDSplits"))
- generateSplits.setAccessible(true)
-
- for (footer <- footers) {
- val file = footer.getFile
- val fs = file.getFileSystem(configuration)
- val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
- val blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
- splits.addAll(
- generateSplits.invoke(
- null,
- blockLocations,
- status,
- readContext.getRequestedSchema.toString,
- readContext.getReadSupportMetadata,
- minSplitSize,
- maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
- }
-
- splits
- }
-
}
private[parquet] object FilteringParquetRowInputFormat {