aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-07-25 11:42:49 -0700
committerReynold Xin <rxin@databricks.com>2015-07-25 11:42:49 -0700
commite2ec018e37cb699077b5fa2bd662f2055cb42296 (patch)
treeafe34287bd1b1d9304394eb381b33569dc5fb4ca
parentc980e20cf17f2980c564beab9b241022872e29ea (diff)
downloadspark-e2ec018e37cb699077b5fa2bd662f2055cb42296.tar.gz
spark-e2ec018e37cb699077b5fa2bd662f2055cb42296.tar.bz2
spark-e2ec018e37cb699077b5fa2bd662f2055cb42296.zip
[SPARK-9285] [SQL] Fixes Row/InternalRow conversion for HadoopFsRelation
This is a follow-up of #7626. It fixes `Row`/`InternalRow` conversion for data sources extending `HadoopFsRelation` with `needConversion` being `true`. Author: Cheng Lian <lian@databricks.com> Closes #7649 from liancheng/spark-9285-conversion-fix and squashes the following commits: 036a50c [Cheng Lian] Addresses PR comment f6d7c6a [Cheng Lian] Fixes Row/InternalRow conversion for HadoopFsRelation
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala23
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala5
2 files changed, 20 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 119bac786d..7126145ddc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.RDDConversions
@@ -593,6 +593,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
*
* @since 1.4.0
*/
+ // TODO Tries to eliminate the extra Catalyst-to-Scala conversion when `needConversion` is true
+ //
+ // PR #7626 separated `Row` and `InternalRow` completely. One of the consequences is that we can
+ // no longer treat an `InternalRow` containing Catalyst values as a `Row`. Thus we have to
+ // introduce another row value conversion for data sources whose `needConversion` is true.
def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
// Yeah, to workaround serialization...
val dataSchema = this.dataSchema
@@ -611,14 +616,26 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
} else {
rdd.asInstanceOf[RDD[InternalRow]]
}
+
converted.mapPartitions { rows =>
val buildProjection = if (codegenEnabled) {
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
} else {
() => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes)
}
- val mutableProjection = buildProjection()
- rows.map(r => mutableProjection(r))
+
+ val projectedRows = {
+ val mutableProjection = buildProjection()
+ rows.map(r => mutableProjection(r))
+ }
+
+ if (needConversion) {
+ val requiredSchema = StructType(requiredColumns.map(dataSchema(_)))
+ val toScala = CatalystTypeConverters.createToScalaConverter(requiredSchema)
+ projectedRows.map(toScala(_).asInstanceOf[Row])
+ } else {
+ projectedRows
+ }
}.asInstanceOf[RDD[Row]]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index d761909d60..e8975e5f5c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-/*
-This is commented out due a bug in the data source API (SPARK-9291).
-
-
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
@@ -54,4 +50,3 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}
-*/