aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-09-30 14:36:54 -0400
committerReynold Xin <rxin@databricks.com>2015-09-30 14:36:54 -0400
commit03cca5dce2cd7618b5c0e33163efb8502415b06e (patch)
tree855037fd16286bfe0f7de42c894b10d1e124acd3 /sql
parentc7b29ae6418368a1266b960ba8776317fd867313 (diff)
downloadspark-03cca5dce2cd7618b5c0e33163efb8502415b06e.tar.gz
spark-03cca5dce2cd7618b5c0e33163efb8502415b06e.tar.bz2
spark-03cca5dce2cd7618b5c0e33163efb8502415b06e.zip
[SPARK-10770] [SQL] SparkPlan.executeCollect/executeTake should return InternalRow rather than external Row.
Author: Reynold Xin <rxin@databricks.com> Closes #8900 from rxin/SPARK-10770-1.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala6
9 files changed, 39 insertions, 39 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 9c67ad18c3..01f60aba87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1415,7 +1415,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
def collect(): Array[Row] = withNewExecutionId {
- queryExecution.executedPlan.executeCollect()
+ queryExecution.executedPlan.executeCollectPublic()
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 34e926e458..adb6bbc4ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -34,13 +34,11 @@ private[sql] case class LocalTableScan(
protected override def doExecute(): RDD[InternalRow] = rdd
- override def executeCollect(): Array[Row] = {
- val converter = CatalystTypeConverters.createToScalaConverter(schema)
- rows.map(converter(_).asInstanceOf[Row]).toArray
+ override def executeCollect(): Array[InternalRow] = {
+ rows.toArray
}
- override def executeTake(limit: Int): Array[Row] = {
- val converter = CatalystTypeConverters.createToScalaConverter(schema)
- rows.map(converter(_).asInstanceOf[Row]).take(limit).toArray
+ override def executeTake(limit: Int): Array[InternalRow] = {
+ rows.take(limit).toArray
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 72f5450510..fcb42047ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -170,11 +170,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Runs this query returning the result as an array.
*/
- def executeCollect(): Array[Row] = {
- execute().mapPartitions { iter =>
- val converter = CatalystTypeConverters.createToScalaConverter(schema)
- iter.map(converter(_).asInstanceOf[Row])
- }.collect()
+ def executeCollect(): Array[InternalRow] = {
+ execute().map(_.copy()).collect()
+ }
+
+ /**
+ * Runs this query returning the result as an array, using external Row format.
+ */
+ def executeCollectPublic(): Array[Row] = {
+ val converter = CatalystTypeConverters.createToScalaConverter(schema)
+ executeCollect().map(converter(_).asInstanceOf[Row])
}
/**
@@ -182,9 +187,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*
* This is modeled after RDD.take but never runs any job locally on the driver.
*/
- def executeTake(n: Int): Array[Row] = {
+ def executeTake(n: Int): Array[InternalRow] = {
if (n == 0) {
- return new Array[Row](0)
+ return new Array[InternalRow](0)
}
val childRDD = execute().map(_.copy())
@@ -218,8 +223,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
partsScanned += numPartsToTry
}
- val converter = CatalystTypeConverters.createToScalaConverter(schema)
- buf.toArray.map(converter(_).asInstanceOf[Row])
+ buf.toArray
}
private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index bf6d44c098..3e49e0a357 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -204,7 +204,7 @@ case class Limit(limit: Int, child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition
- override def executeCollect(): Array[Row] = child.executeTake(limit)
+ override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
protected override def doExecute(): RDD[InternalRow] = {
val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) {
@@ -258,9 +258,8 @@ case class TakeOrderedAndProject(
projection.map(data.map(_)).getOrElse(data)
}
- override def executeCollect(): Array[Row] = {
- val converter = CatalystTypeConverters.createToScalaConverter(schema)
- collectData().map(converter(_).asInstanceOf[Row])
+ override def executeCollect(): Array[InternalRow] = {
+ collectData()
}
// TODO: Terminal split should be implemented differently from non-terminal split.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index af28e2dfa4..05ccc53830 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -54,20 +54,21 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
- protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)
+ protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+ val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+ cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow])
+ }
override def output: Seq[Attribute] = cmd.output
override def children: Seq[SparkPlan] = Nil
- override def executeCollect(): Array[Row] = sideEffectResult.toArray
+ override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
- override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
+ override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
protected override def doExecute(): RDD[InternalRow] = {
- val convert = CatalystTypeConverters.createToCatalystConverter(schema)
- val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow])
- sqlContext.sparkContext.parallelize(converted, 1)
+ sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
override def argString: String = cmd.toString
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index d6aaf424a8..5dbe0fc5f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -121,12 +121,10 @@ object EvaluatePython {
def takeAndServe(df: DataFrame, n: Int): Int = {
registerPicklers()
- // This is an annoying hack - we should refactor the code so executeCollect and executeTake
- // returns InternalRow rather than Row.
- val converter = CatalystTypeConverters.createToCatalystConverter(df.schema)
- val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row =>
- EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], df.schema)
- })
+ val iter = new SerDeUtil.AutoBatchedPickler(
+ df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
+ EvaluatePython.toJava(row, df.schema)
+ })
PythonRDD.serveIterator(iter, s"serve-DataFrame")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
index 3d218f01c9..8549a6a0f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
@@ -245,7 +245,7 @@ object SparkPlanTest {
}
}
)
- resolvedPlan.executeCollect().toSeq
+ resolvedPlan.executeCollectPublic().toSeq
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c75025e79a..17de8ef56f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -563,7 +563,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
/** Extends QueryExecution with hive specific features. */
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
- extends super.QueryExecution(logicalPlan) {
+ extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) {
/**
* Returns the result as a hive compatible sequence of strings. For native commands, the
@@ -581,10 +581,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
.mkString("\t")
}
case command: ExecutedCommand =>
- command.executeCollect().map(_(0).toString)
+ command.executeCollect().map(_.getString(0))
case other =>
- val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq
+ val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 0c700bdb37..f936cf565b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -124,7 +124,7 @@ case class InsertIntoHiveTable(
*
* Note: this is run once and then kept to avoid double insertions.
*/
- protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
@@ -267,10 +267,10 @@ case class InsertIntoHiveTable(
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
- Seq.empty[Row]
+ Seq.empty[InternalRow]
}
- override def executeCollect(): Array[Row] = sideEffectResult.toArray
+ override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)