From e2ec018e37cb699077b5fa2bd662f2055cb42296 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 25 Jul 2015 11:42:49 -0700 Subject: [SPARK-9285] [SQL] Fixes Row/InternalRow conversion for HadoopFsRelation This is a follow-up of #7626. It fixes `Row`/`InternalRow` conversion for data sources extending `HadoopFsRelation` with `needConversion` being `true`. Author: Cheng Lian Closes #7649 from liancheng/spark-9285-conversion-fix and squashes the following commits: 036a50c [Cheng Lian] Addresses PR comment f6d7c6a [Cheng Lian] Fixes Row/InternalRow conversion for HadoopFsRelation --- .../org/apache/spark/sql/sources/interfaces.scala | 23 +++++++++++++++++++--- .../sources/SimpleTextHadoopFsRelationSuite.scala | 5 ----- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 119bac786d..7126145ddc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.execution.RDDConversions @@ -593,6 +593,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio * * @since 1.4.0 */ + // TODO Tries to eliminate the extra Catalyst-to-Scala conversion when `needConversion` is true + // + // PR #7626 separated `Row` and `InternalRow` completely. One of the consequences is that we can + // no longer treat an `InternalRow` containing Catalyst values as a `Row`. Thus we have to + // introduce another row value conversion for data sources whose `needConversion` is true. def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = { // Yeah, to workaround serialization... val dataSchema = this.dataSchema @@ -611,14 +616,26 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } else { rdd.asInstanceOf[RDD[InternalRow]] } + converted.mapPartitions { rows => val buildProjection = if (codegenEnabled) { GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes) } else { () => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes) } - val mutableProjection = buildProjection() - rows.map(r => mutableProjection(r)) + + val projectedRows = { + val mutableProjection = buildProjection() + rows.map(r => mutableProjection(r)) + } + + if (needConversion) { + val requiredSchema = StructType(requiredColumns.map(dataSchema(_))) + val toScala = CatalystTypeConverters.createToScalaConverter(requiredSchema) + projectedRows.map(toScala(_).asInstanceOf[Row]) + } else { + projectedRows + } }.asInstanceOf[RDD[Row]] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index d761909d60..e8975e5f5c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -22,10 +22,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.types.{IntegerType, StructField, StructType} -/* -This is commented out due a bug in the data source API (SPARK-9291). - - class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName @@ -54,4 +50,3 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { } } } -*/ -- cgit v1.2.3