aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-05-20 11:23:40 -0700
committerYin Huai <yhuai@databricks.com>2015-05-20 11:23:40 -0700
commitb631bf73b9f288f37c98b806be430b22485880e5 (patch)
tree2275f4db8f6c4f32585b3249746d2f4d79d8c872 /sql
parent98a46f9dffec294386f6c39acafa7f11adb87a8f (diff)
downloadspark-b631bf73b9f288f37c98b806be430b22485880e5.tar.gz
spark-b631bf73b9f288f37c98b806be430b22485880e5.tar.bz2
spark-b631bf73b9f288f37c98b806be430b22485880e5.zip
[SPARK-7713] [SQL] Use shared broadcast hadoop conf for partitioned table scan.
https://issues.apache.org/jira/browse/SPARK-7713 I tested the performance with the following code: ```scala import sqlContext._ import sqlContext.implicits._ (1 to 5000).foreach { i => val df = (1 to 1000).map(j => (j, s"str$j")).toDF("a", "b").save(s"/tmp/partitioned/i=$i") } sqlContext.sql(""" CREATE TEMPORARY TABLE partitionedParquet USING org.apache.spark.sql.parquet OPTIONS ( path '/tmp/partitioned' )""") table("partitionedParquet").explain(true) ``` In our master `explain` takes 40s in my laptop. With this PR, `explain` takes 14s. Author: Yin Huai <yhuai@databricks.com> Closes #6252 from yhuai/broadcastHadoopConf and squashes the following commits: 6fa73df [Yin Huai] Address comments of Josh and Andrew. 807fbf9 [Yin Huai] Make the new buildScan and SqlNewHadoopRDD private sql. e393555 [Yin Huai] Cheng's comments. 2eb53bb [Yin Huai] Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala113
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala268
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala35
4 files changed, 387 insertions, 48 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 7ca44f7b81..c35b7eff82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConversions._
import scala.util.Try
import com.google.common.base.Objects
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
@@ -32,13 +33,14 @@ import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD._
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
@@ -233,40 +235,20 @@ private[sql] class ParquetRelation2(
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputFiles: Array[FileStatus]): RDD[Row] = {
-
- val job = new Job(SparkHadoopUtil.get.conf)
- val conf = ContextUtil.getConfiguration(job)
-
- ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-
- if (inputFiles.nonEmpty) {
- FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
- }
-
- // Try to push down filters when filter push-down is enabled.
- if (sqlContext.conf.parquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter(dataSchema, _))
- .reduceOption(FilterApi.and)
- .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
- }
-
- conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
- val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
- ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
- })
-
- conf.set(
- RowWriteSupport.SPARK_ROW_SCHEMA,
- ParquetTypesConverter.convertToString(dataSchema.toAttributes))
-
- // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+ inputFiles: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
- conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+ val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
+ // Create the function to set variable Parquet confs at both driver and executor side.
+ val initLocalJobFuncOpt =
+ ParquetRelation2.initializeLocalJobFunc(
+ requiredColumns,
+ filters,
+ dataSchema,
+ useMetadataCache,
+ parquetFilterPushDown) _
+ // Create the function to set input paths at the driver side.
+ val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
@@ -274,12 +256,14 @@ private[sql] class ParquetRelation2(
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
// footers. Especially when a global arbitrative schema (either from metastore or data source
// DDL) is available.
- new NewHadoopRDD(
- sqlContext.sparkContext,
- classOf[FilteringParquetRowInputFormat],
- classOf[Void],
- classOf[Row],
- conf) {
+ new SqlNewHadoopRDD(
+ sc = sqlContext.sparkContext,
+ broadcastedConf = broadcastedConf,
+ initDriverSideJobFuncOpt = Some(setInputPaths),
+ initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
+ inputFormatClass = classOf[FilteringParquetRowInputFormat],
+ keyClass = classOf[Void],
+ valueClass = classOf[Row]) {
val cacheMetadata = useMetadataCache
@@ -311,11 +295,11 @@ private[sql] class ParquetRelation2(
new FilteringParquetRowInputFormat
}
- val jobContext = newJobContext(getConf, jobId)
+ val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext)
Array.tabulate[SparkPartition](rawSplits.size) { i =>
- new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
}
}.values
@@ -452,6 +436,49 @@ private[sql] object ParquetRelation2 extends Logging {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+ /** This closure sets various Parquet configurations at both driver side and executor side. */
+ private[parquet] def initializeLocalJobFunc(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ dataSchema: StructType,
+ useMetadataCache: Boolean,
+ parquetFilterPushDown: Boolean)(job: Job): Unit = {
+ val conf = job.getConfiguration
+ conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName())
+
+ // Try to push down filters when filter push-down is enabled.
+ if (parquetFilterPushDown) {
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .reduceOption(FilterApi.and)
+ .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+ }
+
+ conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+ val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+ ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
+ })
+
+ conf.set(
+ RowWriteSupport.SPARK_ROW_SCHEMA,
+ ParquetTypesConverter.convertToString(dataSchema.toAttributes))
+
+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+ conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+ }
+
+ /** This closure sets input paths at the driver side. */
+ private[parquet] def initializeDriverSideJobFunc(
+ inputFiles: Array[FileStatus])(job: Job): Unit = {
+ // We side the input paths at the driver side.
+ if (inputFiles.nonEmpty) {
+ FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
+ }
+ }
+
private[parquet] def readSchema(
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
footers.map { footer =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 1615a6dcbd..550090d22d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.sources
-import org.apache.spark.Logging
+import org.apache.spark.{SerializableWritable, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
@@ -84,11 +85,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
+ // See buildPartitionedTableScan for the reason that we need to create a shard
+ // broadcast HadoopConf.
+ val sharedHadoopConf = SparkHadoopUtil.get.conf
+ val confBroadcast =
+ t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
pruneFilterProject(
l,
projectList,
filters,
- (a, f) => t.buildScan(a, f, t.paths)) :: Nil
+ (a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil
case l @ LogicalRelation(t: TableScan) =>
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
@@ -115,6 +121,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val output = projections.map(_.toAttribute)
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
+ // Because we are creating one RDD per partition, we need to have a shared HadoopConf.
+ // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
+ val sharedHadoopConf = SparkHadoopUtil.get.conf
+ val confBroadcast =
+ relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
@@ -132,7 +144,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
- val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))
+ val dataRows =
+ relation.buildScan(nonPartitionColumns, filters, Array(dir), confBroadcast)
// Merges data values with partition values.
mergeWithPartitionValues(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
new file mode 100644
index 0000000000..0c7bb6e50c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.spark.broadcast.Broadcast
+
+import org.apache.spark.{Partition => SparkPartition, _}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.rdd.{RDD, HadoopRDD}
+import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
+
+import scala.reflect.ClassTag
+
+private[spark] class SqlNewHadoopPartition(
+ rddId: Int,
+ val index: Int,
+ @transient rawSplit: InputSplit with Writable)
+ extends SparkPartition {
+
+ val serializableHadoopSplit = new SerializableWritable(rawSplit)
+
+ override def hashCode(): Int = 41 * (41 + rddId) + index
+}
+
+/**
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
+ * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
+ * 1. A shared broadcast Hadoop Configuration.
+ * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side
+ * to the shared Hadoop Configuration.
+ * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
+ * and the executor side to the shared Hadoop Configuration.
+ *
+ * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
+ * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be
+ * folded into core.
+ */
+private[sql] class SqlNewHadoopRDD[K, V](
+ @transient sc : SparkContext,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+ initLocalJobFuncOpt: Option[Job => Unit],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V])
+ extends RDD[(K, V)](sc, Nil)
+ with SparkHadoopMapReduceUtil
+ with Logging {
+
+ if (initLocalJobFuncOpt.isDefined) {
+ sc.clean(initLocalJobFuncOpt.get)
+ }
+
+ protected def getJob(): Job = {
+ val conf: Configuration = broadcastedConf.value.value
+ // "new Job" will make a copy of the conf. Then, it is
+ // safe to mutate conf properties with initLocalJobFuncOpt
+ // and initDriverSideJobFuncOpt.
+ val newJob = new Job(conf)
+ initLocalJobFuncOpt.map(f => f(newJob))
+ newJob
+ }
+
+ def getConf(isDriverSide: Boolean): Configuration = {
+ val job = getJob()
+ if (isDriverSide) {
+ initDriverSideJobFuncOpt.map(f => f(job))
+ }
+ job.getConfiguration
+ }
+
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+
+ @transient protected val jobId = new JobID(jobTrackerId, id)
+
+ override def getPartitions: Array[SparkPartition] = {
+ val conf = getConf(isDriverSide = true)
+ val inputFormat = inputFormatClass.newInstance
+ inputFormat match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
+ }
+ val jobContext = newJobContext(conf, jobId)
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val result = new Array[SparkPartition](rawSplits.size)
+ for (i <- 0 until rawSplits.size) {
+ result(i) =
+ new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ result
+ }
+
+ override def compute(
+ theSplit: SparkPartition,
+ context: TaskContext): InterruptibleIterator[(K, V)] = {
+ val iter = new Iterator[(K, V)] {
+ val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
+ val conf = getConf(isDriverSide = false)
+
+ val inputMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
+ // Find a function that will return the FileSystem bytes read by this thread. Do this before
+ // creating RecordReader, because RecordReader's constructor might read some bytes
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
+ split.serializableHadoopSplit.value match {
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ case _ => None
+ }
+ }
+ inputMetrics.setBytesReadCallback(bytesReadCallback)
+
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+ val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+ val format = inputFormatClass.newInstance
+ format match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
+ }
+ val reader = format.createRecordReader(
+ split.serializableHadoopSplit.value, hadoopAttemptContext)
+ reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+
+ // Register an on-task-completion callback to close the input stream.
+ context.addTaskCompletionListener(context => close())
+ var havePair = false
+ var finished = false
+ var recordsSinceMetricsUpdate = 0
+
+ override def hasNext: Boolean = {
+ if (!finished && !havePair) {
+ finished = !reader.nextKeyValue
+ havePair = !finished
+ }
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ if (!finished) {
+ inputMetrics.incRecordsRead(1)
+ }
+ (reader.getCurrentKey, reader.getCurrentValue)
+ }
+
+ private def close() {
+ try {
+ reader.close()
+ if (bytesReadCallback.isDefined) {
+ inputMetrics.updateBytesRead()
+ } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+ split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+ // If we can't get the bytes read from the FS stats, fall back to the split size,
+ // which may be inaccurate.
+ try {
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
+ }
+ }
+ } catch {
+ case e: Exception => {
+ if (!Utils.inShutdown()) {
+ logWarning("Exception in RecordReader.close()", e)
+ }
+ }
+ }
+ }
+ }
+ new InterruptibleIterator(context, iter)
+ }
+
+ /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
+ @DeveloperApi
+ def mapPartitionsWithInputSplit[U: ClassTag](
+ f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] = {
+ new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+ }
+
+ override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
+ val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
+ val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
+ case Some(c) =>
+ try {
+ val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
+ Some(HadoopRDD.convertSplitLocationInfo(infos))
+ } catch {
+ case e : Exception =>
+ logDebug("Failed to use InputSplit#getLocationInfo.", e)
+ None
+ }
+ case None => None
+ }
+ locs.getOrElse(split.getLocations.filter(_ != "localhost"))
+ }
+
+ override def persist(storageLevel: StorageLevel): this.type = {
+ if (storageLevel.deserialized) {
+ logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
+ " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
+ " Use a map transformation to make copies of the records.")
+ }
+ super.persist(storageLevel)
+ }
+}
+
+private[spark] object SqlNewHadoopRDD {
+ /**
+ * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
+ * the given function rather than the index of the partition.
+ */
+ private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
+ prev: RDD[T],
+ f: (InputSplit, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false)
+ extends RDD[U](prev) {
+
+ override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
+
+ override def getPartitions: Array[SparkPartition] = firstParent[T].partitions
+
+ override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = {
+ val partition = split.asInstanceOf[SqlNewHadoopPartition]
+ val inputSplit = partition.serializableHadoopSplit.value
+ f(inputSplit, firstParent[T].iterator(split, context))
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 9b52d1be3d..6a917bf38b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -25,7 +25,9 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
+import org.apache.spark.SerializableWritable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
@@ -484,7 +486,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private[sources] final def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[String]): RDD[Row] = {
+ inputPaths: Array[String],
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
val inputStatuses = inputPaths.flatMap { input =>
val path = new Path(input)
@@ -499,7 +502,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
}
- buildScan(requiredColumns, filters, inputStatuses)
+ buildScan(requiredColumns, filters, inputStatuses, broadcastedConf)
}
/**
@@ -584,6 +587,34 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
/**
+ * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
+ * this relation. For partitioned relations, this method is called for each selected partition,
+ * and builds an `RDD[Row]` containing all rows within that single partition.
+ *
+ * Note: This interface is subject to change in future.
+ *
+ * @param requiredColumns Required columns.
+ * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
+ * of all `filters`. The pushed down filters are currently purely an optimization as they
+ * will all be evaluated again. This means it is safe to use them with methods that produce
+ * false positives such as filtering partitions based on a bloom filter.
+ * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
+ * relation. For a partitioned relation, it contains paths of all data files in a single
+ * selected partition.
+ * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the
+ * overhead of broadcasting the Configuration for every Hadoop RDD.
+ *
+ * @since 1.4.0
+ */
+ private[sql] def buildScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputFiles: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ buildScan(requiredColumns, filters, inputFiles)
+ }
+
+ /**
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
* be put here. For example, user defined output committer can be configured here
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.