aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-10-19 16:57:20 -0700
committerMichael Armbrust <michael@databricks.com>2015-10-19 16:57:20 -0700
commit8b877cc4ee46cad9d1f7cac451801c1410f6c1fe (patch)
tree6b0323f7d9e0bc2d4a463695ac057e2d8d55b174 /sql
parent16906ef23a7aa2854c8cdcaa3bb3808ab39e0eec (diff)
downloadspark-8b877cc4ee46cad9d1f7cac451801c1410f6c1fe.tar.gz
spark-8b877cc4ee46cad9d1f7cac451801c1410f6c1fe.tar.bz2
spark-8b877cc4ee46cad9d1f7cac451801c1410f6c1fe.zip
[SPARK-11088][SQL] Merges partition values using UnsafeProjection
`DataSourceStrategy.mergeWithPartitionValues` is essentially a projection implemented in a quite inefficient way. This PR optimizes this method with `UnsafeProjection` to avoid unnecessary boxing costs. Author: Cheng Lian <lian@databricks.com> Closes #9104 from liancheng/spark-11088.faster-partition-values-merging.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala73
1 files changed, 24 insertions, 49 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 33181fa6c0..ffb4645b89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -140,29 +140,30 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
+ val partitionColumnNames = partitionColumns.fieldNames.toSet
// Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
// will union all partitions and attach partition values if needed.
val scanBuilder = {
- (columns: Seq[Attribute], filters: Array[Filter]) => {
+ (requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
+ val requiredDataColumns =
+ requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))
+
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
- val partitionColNames = partitionColumns.fieldNames
-
// Don't scan any partition columns to save I/O. Here we are being optimistic and
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
- val needed = columns.filterNot(a => partitionColNames.contains(a.name))
- val dataRows =
- relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
+ val dataRows = relation.buildScan(
+ requiredDataColumns.map(_.name).toArray, filters, Array(dir), confBroadcast)
// Merges data values with partition values.
mergeWithPartitionValues(
- relation.schema,
- columns.map(_.name).toArray,
- partitionColNames,
+ requiredColumns,
+ requiredDataColumns,
+ partitionColumns,
partitionValues,
- toCatalystRDD(logicalRelation, needed, dataRows))
+ toCatalystRDD(logicalRelation, requiredDataColumns, dataRows))
}
val unionedRows =
@@ -188,52 +189,27 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
sparkPlan
}
- // TODO: refactor this thing. It is very complicated because it does projection internally.
- // We should just put a project on top of this.
private def mergeWithPartitionValues(
- schema: StructType,
- requiredColumns: Array[String],
- partitionColumns: Array[String],
+ requiredColumns: Seq[Attribute],
+ dataColumns: Seq[Attribute],
+ partitionColumnSchema: StructType,
partitionValues: InternalRow,
dataRows: RDD[InternalRow]): RDD[InternalRow] = {
- val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
-
// If output columns contain any partition column(s), we need to merge scanned data
// columns and requested partition columns to form the final result.
- if (!requiredColumns.sameElements(nonPartitionColumns)) {
- val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
- // To see whether the `index`-th column is a partition column...
- val i = partitionColumns.indexOf(name)
- if (i != -1) {
- val dt = schema(partitionColumns(i)).dataType
- // If yes, gets column value from partition values.
- (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
- mutableRow(ordinal) = partitionValues.get(i, dt)
- }
- } else {
- // Otherwise, inherits the value from scanned data.
- val i = nonPartitionColumns.indexOf(name)
- val dt = schema(nonPartitionColumns(i)).dataType
- (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
- mutableRow(ordinal) = dataRow.get(i, dt)
- }
- }
+ if (requiredColumns != dataColumns) {
+ // Builds `AttributeReference`s for all partition columns so that we can use them to project
+ // required partition columns. Note that if a partition column appears in `requiredColumns`,
+ // we should use the `AttributeReference` in `requiredColumns`.
+ val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
+ val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
+ requiredColumnMap.getOrElse(a.name, a)
}
- // Since we know for sure that this closure is serializable, we can avoid the overhead
- // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
- // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
- val dataTypes = requiredColumns.map(schema(_).dataType)
- val mutableRow = new SpecificMutableRow(dataTypes)
- iterator.map { dataRow =>
- var i = 0
- while (i < mutableRow.numFields) {
- mergers(i)(mutableRow, dataRow, i)
- i += 1
- }
- mutableRow.asInstanceOf[InternalRow]
- }
+ val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
+ val mutableJoinedRow = new JoinedRow()
+ iterator.map(dataRow => projection(mutableJoinedRow(dataRow, partitionValues)))
}
// This is an internal RDD whose call site the user should not be concerned with
@@ -242,7 +218,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
Utils.withDummyCallSite(dataRows.sparkContext) {
new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
}
-
} else {
dataRows
}