From b631bf73b9f288f37c98b806be430b22485880e5 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 20 May 2015 11:23:40 -0700 Subject: [SPARK-7713] [SQL] Use shared broadcast hadoop conf for partitioned table scan. https://issues.apache.org/jira/browse/SPARK-7713 I tested the performance with the following code: ```scala import sqlContext._ import sqlContext.implicits._ (1 to 5000).foreach { i => val df = (1 to 1000).map(j => (j, s"str$j")).toDF("a", "b").save(s"/tmp/partitioned/i=$i") } sqlContext.sql(""" CREATE TEMPORARY TABLE partitionedParquet USING org.apache.spark.sql.parquet OPTIONS ( path '/tmp/partitioned' )""") table("partitionedParquet").explain(true) ``` In our master `explain` takes 40s in my laptop. With this PR, `explain` takes 14s. Author: Yin Huai Closes #6252 from yhuai/broadcastHadoopConf and squashes the following commits: 6fa73df [Yin Huai] Address comments of Josh and Andrew. 807fbf9 [Yin Huai] Make the new buildScan and SqlNewHadoopRDD private sql. e393555 [Yin Huai] Cheng's comments. 2eb53bb [Yin Huai] Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations. --- .../org/apache/spark/sql/parquet/newParquet.scala | 113 +++++---- .../spark/sql/sources/DataSourceStrategy.scala | 19 +- .../apache/spark/sql/sources/SqlNewHadoopRDD.scala | 268 +++++++++++++++++++++ .../org/apache/spark/sql/sources/interfaces.scala | 35 ++- 4 files changed, 387 insertions(+), 48 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 7ca44f7b81..c35b7eff82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConversions._ import scala.util.Try import com.google.common.base.Objects +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ @@ -32,13 +33,14 @@ import parquet.hadoop._ import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.util.ContextUtil +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD._ -import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Row, SQLConf, SQLContext} -import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} +import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException} private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( @@ -233,40 +235,20 @@ private[sql] class ParquetRelation2( override def buildScan( requiredColumns: Array[String], filters: Array[Filter], - inputFiles: Array[FileStatus]): RDD[Row] = { - - val job = new Job(SparkHadoopUtil.get.conf) - val conf = ContextUtil.getConfiguration(job) - - ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) - - if (inputFiles.nonEmpty) { - FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) - } - - // Try to push down filters when filter push-down is enabled. - if (sqlContext.conf.parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) - .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) - } - - conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - ParquetTypesConverter.convertToString(requestedSchema.toAttributes) - }) - - conf.set( - RowWriteSupport.SPARK_ROW_SCHEMA, - ParquetTypesConverter.convertToString(dataSchema.toAttributes)) - - // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata + inputFiles: Array[FileStatus], + broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = { val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean - conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString) + val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown + // Create the function to set variable Parquet confs at both driver and executor side. + val initLocalJobFuncOpt = + ParquetRelation2.initializeLocalJobFunc( + requiredColumns, + filters, + dataSchema, + useMetadataCache, + parquetFilterPushDown) _ + // Create the function to set input paths at the driver side. + val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) @@ -274,12 +256,14 @@ private[sql] class ParquetRelation2( // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and // footers. Especially when a global arbitrative schema (either from metastore or data source // DDL) is available. - new NewHadoopRDD( - sqlContext.sparkContext, - classOf[FilteringParquetRowInputFormat], - classOf[Void], - classOf[Row], - conf) { + new SqlNewHadoopRDD( + sc = sqlContext.sparkContext, + broadcastedConf = broadcastedConf, + initDriverSideJobFuncOpt = Some(setInputPaths), + initLocalJobFuncOpt = Some(initLocalJobFuncOpt), + inputFormatClass = classOf[FilteringParquetRowInputFormat], + keyClass = classOf[Void], + valueClass = classOf[Row]) { val cacheMetadata = useMetadataCache @@ -311,11 +295,11 @@ private[sql] class ParquetRelation2( new FilteringParquetRowInputFormat } - val jobContext = newJobContext(getConf, jobId) + val jobContext = newJobContext(getConf(isDriverSide = true), jobId) val rawSplits = inputFormat.getSplits(jobContext) Array.tabulate[SparkPartition](rawSplits.size) { i => - new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } } }.values @@ -452,6 +436,49 @@ private[sql] object ParquetRelation2 extends Logging { // internally. private[sql] val METASTORE_SCHEMA = "metastoreSchema" + /** This closure sets various Parquet configurations at both driver side and executor side. */ + private[parquet] def initializeLocalJobFunc( + requiredColumns: Array[String], + filters: Array[Filter], + dataSchema: StructType, + useMetadataCache: Boolean, + parquetFilterPushDown: Boolean)(job: Job): Unit = { + val conf = job.getConfiguration + conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName()) + + // Try to push down filters when filter push-down is enabled. + if (parquetFilterPushDown) { + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(ParquetFilters.createFilter(dataSchema, _)) + .reduceOption(FilterApi.and) + .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) + } + + conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { + val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) + ParquetTypesConverter.convertToString(requestedSchema.toAttributes) + }) + + conf.set( + RowWriteSupport.SPARK_ROW_SCHEMA, + ParquetTypesConverter.convertToString(dataSchema.toAttributes)) + + // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata + conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString) + } + + /** This closure sets input paths at the driver side. */ + private[parquet] def initializeDriverSideJobFunc( + inputFiles: Array[FileStatus])(job: Job): Unit = { + // We side the input paths at the driver side. + if (inputFiles.nonEmpty) { + FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) + } + } + private[parquet] def readSchema( footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { footers.map { footer => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index 1615a6dcbd..550090d22d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.sources -import org.apache.spark.Logging +import org.apache.spark.{SerializableWritable, Logging} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ @@ -84,11 +85,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) => + // See buildPartitionedTableScan for the reason that we need to create a shard + // broadcast HadoopConf. + val sharedHadoopConf = SparkHadoopUtil.get.conf + val confBroadcast = + t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf)) pruneFilterProject( l, projectList, filters, - (a, f) => t.buildScan(a, f, t.paths)) :: Nil + (a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil case l @ LogicalRelation(t: TableScan) => createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil @@ -115,6 +121,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val output = projections.map(_.toAttribute) val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] + // Because we are creating one RDD per partition, we need to have a shared HadoopConf. + // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high. + val sharedHadoopConf = SparkHadoopUtil.get.conf + val confBroadcast = + relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf)) + // Builds RDD[Row]s for each selected partition. val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => // The table scan operator (PhysicalRDD) which retrieves required columns from data files. @@ -132,7 +144,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // assuming partition columns data stored in data files are always consistent with those // partition values encoded in partition directory paths. val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains) - val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir)) + val dataRows = + relation.buildScan(nonPartitionColumns, filters, Array(dir), confBroadcast) // Merges data values with partition values. mergeWithPartitionValues( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala new file mode 100644 index 0000000000..0c7bb6e50c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.text.SimpleDateFormat +import java.util.Date + +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit} +import org.apache.spark.broadcast.Broadcast + +import org.apache.spark.{Partition => SparkPartition, _} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.DataReadMethod +import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil +import org.apache.spark.rdd.{RDD, HadoopRDD} +import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils + +import scala.reflect.ClassTag + +private[spark] class SqlNewHadoopPartition( + rddId: Int, + val index: Int, + @transient rawSplit: InputSplit with Writable) + extends SparkPartition { + + val serializableHadoopSplit = new SerializableWritable(rawSplit) + + override def hashCode(): Int = 41 * (41 + rddId) + index +} + +/** + * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, + * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`). + * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions. + * 1. A shared broadcast Hadoop Configuration. + * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side + * to the shared Hadoop Configuration. + * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side + * and the executor side to the shared Hadoop Configuration. + * + * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with + * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be + * folded into core. + */ +private[sql] class SqlNewHadoopRDD[K, V]( + @transient sc : SparkContext, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + @transient initDriverSideJobFuncOpt: Option[Job => Unit], + initLocalJobFuncOpt: Option[Job => Unit], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V]) + extends RDD[(K, V)](sc, Nil) + with SparkHadoopMapReduceUtil + with Logging { + + if (initLocalJobFuncOpt.isDefined) { + sc.clean(initLocalJobFuncOpt.get) + } + + protected def getJob(): Job = { + val conf: Configuration = broadcastedConf.value.value + // "new Job" will make a copy of the conf. Then, it is + // safe to mutate conf properties with initLocalJobFuncOpt + // and initDriverSideJobFuncOpt. + val newJob = new Job(conf) + initLocalJobFuncOpt.map(f => f(newJob)) + newJob + } + + def getConf(isDriverSide: Boolean): Configuration = { + val job = getJob() + if (isDriverSide) { + initDriverSideJobFuncOpt.map(f => f(job)) + } + job.getConfiguration + } + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + @transient protected val jobId = new JobID(jobTrackerId, id) + + override def getPartitions: Array[SparkPartition] = { + val conf = getConf(isDriverSide = true) + val inputFormat = inputFormatClass.newInstance + inputFormat match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => + } + val jobContext = newJobContext(conf, jobId) + val rawSplits = inputFormat.getSplits(jobContext).toArray + val result = new Array[SparkPartition](rawSplits.size) + for (i <- 0 until rawSplits.size) { + result(i) = + new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } + result + } + + override def compute( + theSplit: SparkPartition, + context: TaskContext): InterruptibleIterator[(K, V)] = { + val iter = new Iterator[(K, V)] { + val split = theSplit.asInstanceOf[SqlNewHadoopPartition] + logInfo("Input split: " + split.serializableHadoopSplit) + val conf = getConf(isDriverSide = false) + + val inputMetrics = context.taskMetrics + .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // creating RecordReader, because RecordReader's constructor might read some bytes + val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { + split.serializableHadoopSplit.value match { + case _: FileSplit | _: CombineFileSplit => + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + case _ => None + } + } + inputMetrics.setBytesReadCallback(bytesReadCallback) + + val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) + val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) + val format = inputFormatClass.newInstance + format match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => + } + val reader = format.createRecordReader( + split.serializableHadoopSplit.value, hadoopAttemptContext) + reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener(context => close()) + var havePair = false + var finished = false + var recordsSinceMetricsUpdate = 0 + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = !reader.nextKeyValue + havePair = !finished + } + !finished + } + + override def next(): (K, V) = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + if (!finished) { + inputMetrics.incRecordsRead(1) + } + (reader.getCurrentKey, reader.getCurrentValue) + } + + private def close() { + try { + reader.close() + if (bytesReadCallback.isDefined) { + inputMetrics.updateBytesRead() + } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || + split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) { + // If we can't get the bytes read from the FS stats, fall back to the split size, + // which may be inaccurate. + try { + inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + } + } catch { + case e: Exception => { + if (!Utils.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } + } + } + } + new InterruptibleIterator(context, iter) + } + + /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */ + @DeveloperApi + def mapPartitionsWithInputSplit[U: ClassTag]( + f: (InputSplit, Iterator[(K, V)]) => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = { + new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning) + } + + override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = { + val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value + val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match { + case Some(c) => + try { + val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]] + Some(HadoopRDD.convertSplitLocationInfo(infos)) + } catch { + case e : Exception => + logDebug("Failed to use InputSplit#getLocationInfo.", e) + None + } + case None => None + } + locs.getOrElse(split.getLocations.filter(_ != "localhost")) + } + + override def persist(storageLevel: StorageLevel): this.type = { + if (storageLevel.deserialized) { + logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" + + " behavior because Hadoop's RecordReader reuses the same Writable object for all records." + + " Use a map transformation to make copies of the records.") + } + super.persist(storageLevel) + } +} + +private[spark] object SqlNewHadoopRDD { + /** + * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to + * the given function rather than the index of the partition. + */ + private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag]( + prev: RDD[T], + f: (InputSplit, Iterator[T]) => Iterator[U], + preservesPartitioning: Boolean = false) + extends RDD[U](prev) { + + override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + + override def getPartitions: Array[SparkPartition] = firstParent[T].partitions + + override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = { + val partition = split.asInstanceOf[SqlNewHadoopPartition] + val inputSplit = partition.serializableHadoopSplit.value + f(inputSplit, firstParent[T].iterator(split, context)) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 9b52d1be3d..6a917bf38b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD +import org.apache.spark.SerializableWritable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection @@ -484,7 +486,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio private[sources] final def buildScan( requiredColumns: Array[String], filters: Array[Filter], - inputPaths: Array[String]): RDD[Row] = { + inputPaths: Array[String], + broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = { val inputStatuses = inputPaths.flatMap { input => val path = new Path(input) @@ -499,7 +502,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } } - buildScan(requiredColumns, filters, inputStatuses) + buildScan(requiredColumns, filters, inputStatuses, broadcastedConf) } /** @@ -583,6 +586,34 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio buildScan(requiredColumns, inputFiles) } + /** + * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within + * this relation. For partitioned relations, this method is called for each selected partition, + * and builds an `RDD[Row]` containing all rows within that single partition. + * + * Note: This interface is subject to change in future. + * + * @param requiredColumns Required columns. + * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction + * of all `filters`. The pushed down filters are currently purely an optimization as they + * will all be evaluated again. This means it is safe to use them with methods that produce + * false positives such as filtering partitions based on a bloom filter. + * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the + * relation. For a partitioned relation, it contains paths of all data files in a single + * selected partition. + * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the + * overhead of broadcasting the Configuration for every Hadoop RDD. + * + * @since 1.4.0 + */ + private[sql] def buildScan( + requiredColumns: Array[String], + filters: Array[Filter], + inputFiles: Array[FileStatus], + broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = { + buildScan(requiredColumns, filters, inputFiles) + } + /** * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can * be put here. For example, user defined output committer can be configured here -- cgit v1.2.3