aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-11 22:59:42 -0700
committerYin Huai <yhuai@databricks.com>2016-04-11 22:59:42 -0700
commit678b96e77bf77a64b8df14b19db5a3bb18febfe3 (patch)
tree1052c9b1428b03620e088b0c0524df6191b94605 /sql/core/src
parent52a801124f429ab133f9a3867c1da6ebd8fa7d4e (diff)
downloadspark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.tar.gz
spark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.tar.bz2
spark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.zip
[SPARK-14535][SQL] Remove buildInternalScan from FileFormat
## What changes were proposed in this pull request? Now `HadoopFsRelation` with all kinds of file formats can be handled in `FileSourceStrategy`, we can remove the branches for `HadoopFsRelation` in `FileSourceStrategy` and the `buildInternalScan` API from `FileFormat`. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12300 from cloud-fan/remove.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala390
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala110
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala12
9 files changed, 4 insertions, 635 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8c183317f6..c3885a3be5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -110,133 +110,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters,
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
- // Scanning partitioned HadoopFsRelation
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _))
- if t.partitionSchema.nonEmpty =>
- // We divide the filter expressions into 3 parts
- val partitionColumns = AttributeSet(
- t.partitionSchema.map(c => l.output.find(_.name == c.name).get))
-
- // Only pruning the partition keys
- val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns))
-
- // Only pushes down predicates that do not reference partition keys.
- val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
-
- // Predicates with both partition keys and attributes
- val partitionAndNormalColumnFilters =
- filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
-
- val selectedPartitions = t.location.listFiles(partitionFilters)
-
- logInfo {
- val total = t.partitionSpec.partitions.length
- val selected = selectedPartitions.length
- val percentPruned = (1 - selected.toDouble / total.toDouble) * 100
- s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
- }
-
- // need to add projections from "partitionAndNormalColumnAttrs" in if it is not empty
- val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters)
- val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) {
- projects
- } else {
- (partitionAndNormalColumnAttrs ++ projects).toSeq
- }
-
- // Prune the buckets based on the pushed filters that do not contain partitioning key
- // since the bucketing key is not allowed to use the columns in partitioning key
- val bucketSet = getBuckets(pushedFilters, t.bucketSpec)
- val scan = buildPartitionedTableScan(
- l,
- partitionAndNormalColumnProjs,
- pushedFilters,
- bucketSet,
- t.partitionSpec.partitionColumns,
- selectedPartitions,
- t.options)
-
- // Add a Projection to guarantee the original projection:
- // this is because "partitionAndNormalColumnAttrs" may be different
- // from the original "projects", in elements or their ordering
-
- partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf =>
- if (projects.isEmpty || projects == partitionAndNormalColumnProjs) {
- // if the original projection is empty, no need for the additional Project either
- execution.Filter(cf, scan)
- } else {
- execution.Project(projects, execution.Filter(cf, scan))
- }
- ).getOrElse(scan) :: Nil
-
- // TODO: The code for planning bucketed/unbucketed/partitioned/unpartitioned tables contains
- // a lot of duplication and produces overly complicated RDDs.
-
- // Scanning non-partitioned HadoopFsRelation
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) =>
- // See buildPartitionedTableScan for the reason that we need to create a shard
- // broadcast HadoopConf.
- val sharedHadoopConf = SparkHadoopUtil.get.conf
- val confBroadcast =
- t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
-
- t.bucketSpec match {
- case Some(spec) if t.sqlContext.conf.bucketingEnabled =>
- val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = {
- (requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
- val bucketed =
- t.location
- .allFiles()
- .filterNot(_.getPath.getName startsWith "_")
- .groupBy { f =>
- BucketingUtils
- .getBucketId(f.getPath.getName)
- .getOrElse(sys.error(s"Invalid bucket file ${f.getPath}"))
- }
-
- val bucketedDataMap = bucketed.mapValues { bucketFiles =>
- t.fileFormat.buildInternalScan(
- t.sqlContext,
- t.dataSchema,
- requiredColumns.map(_.name).toArray,
- filters,
- None,
- bucketFiles,
- confBroadcast,
- t.options).coalesce(1)
- }
-
- val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext,
- (0 until spec.numBuckets).map { bucketId =>
- bucketedDataMap.getOrElse(bucketId, t.sqlContext.emptyResult: RDD[InternalRow])
- })
- bucketedRDD
- }
- }
-
- pruneFilterProject(
- l,
- projects,
- filters,
- scanBuilder) :: Nil
-
- case _ =>
- pruneFilterProject(
- l,
- projects,
- filters,
- (a, f) =>
- t.fileFormat.buildInternalScan(
- t.sqlContext,
- t.dataSchema,
- a.map(_.name).toArray,
- f,
- None,
- t.location.allFiles(),
- confBroadcast,
- t.options)) :: Nil
- }
-
case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
execution.DataSourceScan.create(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
@@ -248,218 +121,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
case _ => Nil
}
- private def buildPartitionedTableScan(
- logicalRelation: LogicalRelation,
- projections: Seq[NamedExpression],
- filters: Seq[Expression],
- buckets: Option[BitSet],
- partitionColumns: StructType,
- partitions: Seq[Partition],
- options: Map[String, String]): SparkPlan = {
- val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
-
- // Because we are creating one RDD per partition, we need to have a shared HadoopConf.
- // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
- val sharedHadoopConf = SparkHadoopUtil.get.conf
- val confBroadcast =
- relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
- val partitionColumnNames = partitionColumns.fieldNames.toSet
-
- // Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
- // will union all partitions and attach partition values if needed.
- val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = {
- (requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
-
- relation.bucketSpec match {
- case Some(spec) if relation.sqlContext.conf.bucketingEnabled =>
- val requiredDataColumns =
- requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))
-
- // Builds RDD[Row]s for each selected partition.
- val perPartitionRows: Seq[(Int, RDD[InternalRow])] = partitions.flatMap {
- case Partition(partitionValues, files) =>
- val bucketed = files.groupBy { f =>
- BucketingUtils
- .getBucketId(f.getPath.getName)
- .getOrElse(sys.error(s"Invalid bucket file ${f.getPath}"))
- }
-
- bucketed.map { bucketFiles =>
- // Don't scan any partition columns to save I/O. Here we are being optimistic and
- // assuming partition columns data stored in data files are always consistent with
- // those partition values encoded in partition directory paths.
- val dataRows = relation.fileFormat.buildInternalScan(
- relation.sqlContext,
- relation.dataSchema,
- requiredDataColumns.map(_.name).toArray,
- filters,
- buckets,
- bucketFiles._2,
- confBroadcast,
- options)
-
- // Merges data values with partition values.
- bucketFiles._1 -> mergeWithPartitionValues(
- requiredColumns,
- requiredDataColumns,
- partitionColumns,
- partitionValues,
- dataRows)
- }
- }
-
- val bucketedDataMap: Map[Int, Seq[RDD[InternalRow]]] =
- perPartitionRows.groupBy(_._1).mapValues(_.map(_._2))
-
- val bucketed = new UnionRDD(relation.sqlContext.sparkContext,
- (0 until spec.numBuckets).map { bucketId =>
- bucketedDataMap.get(bucketId).map(i => i.reduce(_ ++ _).coalesce(1)).getOrElse {
- relation.sqlContext.emptyResult: RDD[InternalRow]
- }
- })
- bucketed
-
- case _ =>
- val requiredDataColumns =
- requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))
-
- // Builds RDD[Row]s for each selected partition.
- val perPartitionRows = partitions.map {
- case Partition(partitionValues, files) =>
- val dataRows = relation.fileFormat.buildInternalScan(
- relation.sqlContext,
- relation.dataSchema,
- requiredDataColumns.map(_.name).toArray,
- filters,
- buckets,
- files,
- confBroadcast,
- options)
-
- // Merges data values with partition values.
- mergeWithPartitionValues(
- requiredColumns,
- requiredDataColumns,
- partitionColumns,
- partitionValues,
- dataRows)
- }
- new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
- }
- }
- }
-
- // Create the scan operator. If needed, add Filter and/or Project on top of the scan.
- // The added Filter/Project is on top of the unioned RDD. We do not want to create
- // one Filter/Project for every partition.
- val sparkPlan = pruneFilterProject(
- logicalRelation,
- projections,
- filters,
- scanBuilder)
-
- sparkPlan
- }
-
- /**
- * Creates a ColumnarBatch that contains the values for `requiredColumns`. These columns can
- * either come from `input` (columns scanned from the data source) or from the partitioning
- * values (data from `partitionValues`). This is done *once* per physical partition. When
- * the column is from `input`, it just references the same underlying column. When using
- * partition columns, the column is populated once.
- * TODO: there's probably a cleaner way to do this.
- */
- private def projectedColumnBatch(
- input: ColumnarBatch,
- requiredColumns: Seq[Attribute],
- dataColumns: Seq[Attribute],
- partitionColumnSchema: StructType,
- partitionValues: InternalRow) : ColumnarBatch = {
- val result = ColumnarBatch.allocate(StructType.fromAttributes(requiredColumns))
- var resultIdx = 0
- var inputIdx = 0
-
- while (resultIdx < requiredColumns.length) {
- val attr = requiredColumns(resultIdx)
- if (inputIdx < dataColumns.length && requiredColumns(resultIdx) == dataColumns(inputIdx)) {
- result.setColumn(resultIdx, input.column(inputIdx))
- inputIdx += 1
- } else {
- require(partitionColumnSchema.fields.count(_.name == attr.name) == 1)
- var partitionIdx = 0
- partitionColumnSchema.fields.foreach { f => {
- if (f.name.equals(attr.name)) {
- ColumnVectorUtils.populate(result.column(resultIdx), partitionValues, partitionIdx)
- }
- partitionIdx += 1
- }}
- }
- resultIdx += 1
- }
- result
- }
-
- private def mergeWithPartitionValues(
- requiredColumns: Seq[Attribute],
- dataColumns: Seq[Attribute],
- partitionColumnSchema: StructType,
- partitionValues: InternalRow,
- dataRows: RDD[InternalRow]): RDD[InternalRow] = {
- // If output columns contain any partition column(s), we need to merge scanned data
- // columns and requested partition columns to form the final result.
- if (requiredColumns != dataColumns) {
- // Builds `AttributeReference`s for all partition columns so that we can use them to project
- // required partition columns. Note that if a partition column appears in `requiredColumns`,
- // we should use the `AttributeReference` in `requiredColumns`.
- val partitionColumns = {
- val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
- partitionColumnSchema.toAttributes.map { a =>
- requiredColumnMap.getOrElse(a.name, a)
- }
- }
-
- val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Object]) => {
- // Note that we can't use an `UnsafeRowJoiner` to replace the following `JoinedRow` and
- // `UnsafeProjection`. Because the projection may also adjust column order.
- val mutableJoinedRow = new JoinedRow()
- val unsafePartitionValues = UnsafeProjection.create(partitionColumnSchema)(partitionValues)
- val unsafeProjection =
- UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
-
- // If we are returning batches directly, we need to augment them with the partitioning
- // columns. We want to do this without a row by row operation.
- var columnBatch: ColumnarBatch = null
- var mergedBatch: ColumnarBatch = null
-
- iterator.map { input => {
- if (input.isInstanceOf[InternalRow]) {
- unsafeProjection(mutableJoinedRow(
- input.asInstanceOf[InternalRow], unsafePartitionValues))
- } else {
- require(input.isInstanceOf[ColumnarBatch])
- val inputBatch = input.asInstanceOf[ColumnarBatch]
- if (inputBatch != mergedBatch) {
- mergedBatch = inputBatch
- columnBatch = projectedColumnBatch(inputBatch, requiredColumns,
- dataColumns, partitionColumnSchema, partitionValues)
- }
- columnBatch.setNumRows(inputBatch.numRows())
- columnBatch
- }
- }}
- }
-
- // This is an internal RDD whose call site the user should not be concerned with
- // Since we create many of these (one per partition), the time spent on computing
- // the call site may add up.
- Utils.withDummyCallSite(dataRows.sparkContext) {
- new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
- }.asInstanceOf[RDD[InternalRow]]
- } else {
- dataRows
- }
- }
-
// Get the bucket ID based on the bucketing values.
// Restriction: Bucket pruning works iff the bucketing column has one and only one column.
def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = {
@@ -472,57 +133,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
bucketIdGeneration(mutableRow).getInt(0)
}
- // Get the bucket BitSet by reading the filters that only contains bucketing keys.
- // Note: When the returned BitSet is None, no pruning is possible.
- // Restriction: Bucket pruning works iff the bucketing column has one and only one column.
- private def getBuckets(
- filters: Seq[Expression],
- bucketSpec: Option[BucketSpec]): Option[BitSet] = {
-
- if (bucketSpec.isEmpty ||
- bucketSpec.get.numBuckets == 1 ||
- bucketSpec.get.bucketColumnNames.length != 1) {
- // None means all the buckets need to be scanned
- return None
- }
-
- // Just get the first because bucketing pruning only works when the column has one column
- val bucketColumnName = bucketSpec.get.bucketColumnNames.head
- val numBuckets = bucketSpec.get.numBuckets
- val matchedBuckets = new BitSet(numBuckets)
- matchedBuckets.clear()
-
- filters.foreach {
- case expressions.EqualTo(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
- matchedBuckets.set(getBucketId(a, numBuckets, v))
- case expressions.EqualTo(Literal(v, _), a: Attribute) if a.name == bucketColumnName =>
- matchedBuckets.set(getBucketId(a, numBuckets, v))
- case expressions.EqualNullSafe(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
- matchedBuckets.set(getBucketId(a, numBuckets, v))
- case expressions.EqualNullSafe(Literal(v, _), a: Attribute) if a.name == bucketColumnName =>
- matchedBuckets.set(getBucketId(a, numBuckets, v))
- // Because we only convert In to InSet in Optimizer when there are more than certain
- // items. So it is possible we still get an In expression here that needs to be pushed
- // down.
- case expressions.In(a: Attribute, list)
- if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
- val hSet = list.map(e => e.eval(EmptyRow))
- hSet.foreach(e => matchedBuckets.set(getBucketId(a, numBuckets, e)))
- case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
- matchedBuckets.set(getBucketId(a, numBuckets, null))
- case _ =>
- }
-
- logInfo {
- val selected = matchedBuckets.cardinality()
- val percentPruned = (1 - selected.toDouble / numBuckets.toDouble) * 100
- s"Selected $selected buckets out of $numBuckets, pruned $percentPruned% partitions."
- }
-
- // None means all the buckets need to be scanned
- if (matchedBuckets.cardinality() == 0) None else Some(matchedBuckets)
- }
-
// Based on Public API.
protected def pruneFilterProject(
relation: LogicalRelation,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index aa1f76450c..bcddf72851 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -55,15 +55,7 @@ import org.apache.spark.sql.sources._
*/
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
- if (files.fileFormat.toString == "TestFileFormat" ||
- files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
- files.fileFormat.toString == "ORC" ||
- files.fileFormat.toString == "LibSVM" ||
- files.fileFormat.isInstanceOf[csv.DefaultSource] ||
- files.fileFormat.isInstanceOf[text.DefaultSource] ||
- files.fileFormat.isInstanceOf[json.DefaultSource]) &&
- files.sqlContext.conf.useFileScan =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 34fcbdf871..06a371b88b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -133,37 +133,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
- /**
- * This supports to eliminate unneeded columns before producing an RDD
- * containing all of its tuples as Row objects. This reads all the tokens of each line
- * and then drop unneeded tokens without casting and type-checking by mapping
- * both the indices produced by `requiredColumns` and the ones of tokens.
- */
- override def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- inputFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow] = {
- // TODO: Filter before calling buildInternalScan.
- val csvFiles = inputFiles.filterNot(_.getPath.getName startsWith "_")
-
- val csvOptions = new CSVOptions(options)
- val pathsString = csvFiles.map(_.getPath.toUri.toString)
- val header = dataSchema.fields.map(_.name)
- val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString)
- val rows = CSVRelation.parseCsv(tokenizedRdd, dataSchema, requiredColumns, csvOptions)
-
- val requiredDataSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get))
- rows.mapPartitions { iterator =>
- val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
- iterator.map(unsafeProjection)
- }
- }
-
private def baseRdd(
sqlContext: SQLContext,
options: CSVOptions,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 42cd25a18c..f32fea4183 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -93,35 +93,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
- override def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- inputFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow] = {
- // TODO: Filter files for all formats before calling buildInternalScan.
- val jsonFiles = inputFiles.filterNot(_.getPath.getName startsWith "_")
-
- val parsedOptions: JSONOptions = new JSONOptions(options)
- val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
- val columnNameOfCorruptRecord =
- parsedOptions.columnNameOfCorruptRecord
- .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
- val rows = JacksonParser.parse(
- createBaseRdd(sqlContext, jsonFiles),
- requiredDataSchema,
- columnNameOfCorruptRecord,
- parsedOptions)
-
- rows.mapPartitions { iterator =>
- val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
- iterator.map(unsafeProjection)
- }
- }
-
override def buildReader(
sqlContext: SQLContext,
dataSchema: StructType,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index bcb2b2de13..dbda094996 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -251,12 +251,12 @@ private[sql] class DefaultSource
}
/**
- * Returns whether the reader will the rows as batch or not.
+ * Returns whether the reader will return the rows as batch or not.
*/
override def supportBatch(sqlContext: SQLContext, schema: StructType): Boolean = {
val conf = SQLContext.getActive().get.conf
- conf.useFileScan && conf.parquetVectorizedReaderEnabled &&
- conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields &&
+ conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
}
@@ -375,110 +375,6 @@ private[sql] class DefaultSource
}
}
}
-
- override def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- allFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow] = {
- val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
- val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
- val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
- val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
-
- // Parquet row group size. We will use this value as the value for
- // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
- // of these flags are smaller than the parquet row group size.
- val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
-
- // Create the function to set variable Parquet confs at both driver and executor side.
- val initLocalJobFuncOpt =
- ParquetRelation.initializeLocalJobFunc(
- requiredColumns,
- filters,
- dataSchema,
- parquetBlockSize,
- useMetadataCache,
- parquetFilterPushDown,
- assumeBinaryIsString,
- assumeInt96IsTimestamp) _
-
- val inputFiles = splitFiles(allFiles).data.toArray
-
- // Create the function to set input paths at the driver side.
- val setInputPaths =
- ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _
-
- val allPrimitiveTypes = dataSchema.forall(_.dataType.isInstanceOf[AtomicType])
- val inputFormatCls = if (sqlContext.conf.parquetVectorizedReaderEnabled
- && allPrimitiveTypes) {
- classOf[VectorizedParquetInputFormat]
- } else {
- classOf[ParquetInputFormat[InternalRow]]
- }
-
- Utils.withDummyCallSite(sqlContext.sparkContext) {
- new SqlNewHadoopRDD(
- sqlContext = sqlContext,
- broadcastedConf = broadcastedConf,
- initDriverSideJobFuncOpt = Some(setInputPaths),
- initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
- inputFormatClass = inputFormatCls,
- valueClass = classOf[InternalRow]) {
-
- val cacheMetadata = useMetadataCache
-
- @transient val cachedStatuses = inputFiles.map { f =>
- // In order to encode the authority of a Path containing special characters such as '/'
- // (which does happen in some S3N credentials), we need to use the string returned by the
- // URI of the path to create a new Path.
- val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
- new FileStatus(
- f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime,
- f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
- }.toSeq
-
- private def escapePathUserInfo(path: Path): Path = {
- val uri = path.toUri
- new Path(new URI(
- uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath,
- uri.getQuery, uri.getFragment))
- }
-
- // Overridden so we can inject our own cached files statuses.
- override def getPartitions: Array[SparkPartition] = {
- val inputFormat = new ParquetInputFormat[InternalRow] {
- override def listStatus(jobContext: JobContext): JList[FileStatus] = {
- if (cacheMetadata) cachedStatuses.asJava else super.listStatus(jobContext)
- }
- }
-
- val jobContext = new JobContextImpl(getConf(isDriverSide = true), jobId)
- val rawSplits = inputFormat.getSplits(jobContext)
-
- Array.tabulate[SparkPartition](rawSplits.size) { i =>
- new SqlNewHadoopPartition(
- id, i, rawSplits.get(i).asInstanceOf[InputSplit with Writable])
- }
- }
- }
- }
- }
-}
-
-/**
- * The ParquetInputFormat that create VectorizedParquetRecordReader.
- */
-final class VectorizedParquetInputFormat extends ParquetInputFormat[InternalRow] {
- override def createRecordReader(
- inputSplit: InputSplit,
- taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = {
- new VectorizedParquetRecordReader().asInstanceOf[RecordReader[Void, InternalRow]]
- }
}
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 99459ba1d3..28b03ee7c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -88,45 +88,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
- override def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- inputFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow] = {
- verifySchema(dataSchema)
-
- val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
- val conf = job.getConfiguration
- val paths = inputFiles
- .filterNot(_.getPath.getName startsWith "_")
- .map(_.getPath)
- .sortBy(_.toUri)
-
- if (paths.nonEmpty) {
- FileInputFormat.setInputPaths(job, paths: _*)
- }
-
- sqlContext.sparkContext.hadoopRDD(
- conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
- .mapPartitions { iter =>
- val unsafeRow = new UnsafeRow(1)
- val bufferHolder = new BufferHolder(unsafeRow)
- val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
-
- iter.map { case (_, line) =>
- // Writes to an UnsafeRow directly
- bufferHolder.reset()
- unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
- unsafeRow.setTotalSize(bufferHolder.totalSize())
- unsafeRow
- }
- }
- }
-
override def buildReader(
sqlContext: SQLContext,
dataSchema: StructType,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b58f960897..e74fb00cb2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -145,12 +145,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val USE_FILE_SCAN = SQLConfigBuilder("spark.sql.sources.fileScan")
- .internal()
- .doc("Use the new FileScanRDD path for reading HDSF based data sources.")
- .booleanConf
- .createWithDefault(true)
-
val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
@@ -481,8 +475,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def useCompression: Boolean = getConf(COMPRESS_CACHED)
- def useFileScan: Boolean = getConf(USE_FILE_SCAN)
-
def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6acb41dd1f..65b1f61349 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -458,16 +458,6 @@ trait FileFormat {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
- def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- inputFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow]
-
/**
* Returns whether this format support returning columnar batch or not.
*
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 41f536fc37..90d7f53884 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -365,18 +365,6 @@ class TestFileFormat extends FileFormat {
throw new NotImplementedError("JUST FOR TESTING")
}
- override def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- inputFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow] = {
- throw new NotImplementedError("JUST FOR TESTING")
- }
-
override def buildReader(
sqlContext: SQLContext,
dataSchema: StructType,