diff options
author | Cheng Lian <lian@databricks.com> | 2015-10-19 16:57:20 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-10-19 16:57:20 -0700 |
commit | 8b877cc4ee46cad9d1f7cac451801c1410f6c1fe (patch) | |
tree | 6b0323f7d9e0bc2d4a463695ac057e2d8d55b174 /sql/core | |
parent | 16906ef23a7aa2854c8cdcaa3bb3808ab39e0eec (diff) | |
download | spark-8b877cc4ee46cad9d1f7cac451801c1410f6c1fe.tar.gz spark-8b877cc4ee46cad9d1f7cac451801c1410f6c1fe.tar.bz2 spark-8b877cc4ee46cad9d1f7cac451801c1410f6c1fe.zip |
[SPARK-11088][SQL] Merges partition values using UnsafeProjection
`DataSourceStrategy.mergeWithPartitionValues` is essentially a projection implemented in a quite inefficient way. This PR optimizes this method with `UnsafeProjection` to avoid unnecessary boxing costs.
Author: Cheng Lian <lian@databricks.com>
Closes #9104 from liancheng/spark-11088.faster-partition-values-merging.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala | 73 |
1 files changed, 24 insertions, 49 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 33181fa6c0..ffb4645b89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -140,29 +140,30 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val sharedHadoopConf = SparkHadoopUtil.get.conf val confBroadcast = relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) + val partitionColumnNames = partitionColumns.fieldNames.toSet // Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder // will union all partitions and attach partition values if needed. val scanBuilder = { - (columns: Seq[Attribute], filters: Array[Filter]) => { + (requiredColumns: Seq[Attribute], filters: Array[Filter]) => { + val requiredDataColumns = + requiredColumns.filterNot(c => partitionColumnNames.contains(c.name)) + // Builds RDD[Row]s for each selected partition. val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => - val partitionColNames = partitionColumns.fieldNames - // Don't scan any partition columns to save I/O. Here we are being optimistic and // assuming partition columns data stored in data files are always consistent with those // partition values encoded in partition directory paths. - val needed = columns.filterNot(a => partitionColNames.contains(a.name)) - val dataRows = - relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast) + val dataRows = relation.buildScan( + requiredDataColumns.map(_.name).toArray, filters, Array(dir), confBroadcast) // Merges data values with partition values. mergeWithPartitionValues( - relation.schema, - columns.map(_.name).toArray, - partitionColNames, + requiredColumns, + requiredDataColumns, + partitionColumns, partitionValues, - toCatalystRDD(logicalRelation, needed, dataRows)) + toCatalystRDD(logicalRelation, requiredDataColumns, dataRows)) } val unionedRows = @@ -188,52 +189,27 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { sparkPlan } - // TODO: refactor this thing. It is very complicated because it does projection internally. - // We should just put a project on top of this. private def mergeWithPartitionValues( - schema: StructType, - requiredColumns: Array[String], - partitionColumns: Array[String], + requiredColumns: Seq[Attribute], + dataColumns: Seq[Attribute], + partitionColumnSchema: StructType, partitionValues: InternalRow, dataRows: RDD[InternalRow]): RDD[InternalRow] = { - val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains) - // If output columns contain any partition column(s), we need to merge scanned data // columns and requested partition columns to form the final result. - if (!requiredColumns.sameElements(nonPartitionColumns)) { - val mergers = requiredColumns.zipWithIndex.map { case (name, index) => - // To see whether the `index`-th column is a partition column... - val i = partitionColumns.indexOf(name) - if (i != -1) { - val dt = schema(partitionColumns(i)).dataType - // If yes, gets column value from partition values. - (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => { - mutableRow(ordinal) = partitionValues.get(i, dt) - } - } else { - // Otherwise, inherits the value from scanned data. - val i = nonPartitionColumns.indexOf(name) - val dt = schema(nonPartitionColumns(i)).dataType - (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => { - mutableRow(ordinal) = dataRow.get(i, dt) - } - } + if (requiredColumns != dataColumns) { + // Builds `AttributeReference`s for all partition columns so that we can use them to project + // required partition columns. Note that if a partition column appears in `requiredColumns`, + // we should use the `AttributeReference` in `requiredColumns`. + val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap + val partitionColumns = partitionColumnSchema.toAttributes.map { a => + requiredColumnMap.getOrElse(a.name, a) } - // Since we know for sure that this closure is serializable, we can avoid the overhead - // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally - // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718). val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => { - val dataTypes = requiredColumns.map(schema(_).dataType) - val mutableRow = new SpecificMutableRow(dataTypes) - iterator.map { dataRow => - var i = 0 - while (i < mutableRow.numFields) { - mergers(i)(mutableRow, dataRow, i) - i += 1 - } - mutableRow.asInstanceOf[InternalRow] - } + val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns) + val mutableJoinedRow = new JoinedRow() + iterator.map(dataRow => projection(mutableJoinedRow(dataRow, partitionValues))) } // This is an internal RDD whose call site the user should not be concerned with @@ -242,7 +218,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { Utils.withDummyCallSite(dataRows.sparkContext) { new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false) } - } else { dataRows } |