diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-06-02 12:09:43 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-02 12:09:43 -0700 |
commit | d000ca98a80986ff5b13477547f1dcab7856ae63 (patch) | |
tree | 991d2a007f327dc472ea1af5a0d72b337f96daad /sql | |
parent | 9a5d482e090eaaea8491d3864667e0f513e7195c (diff) | |
download | spark-d000ca98a80986ff5b13477547f1dcab7856ae63.tar.gz spark-d000ca98a80986ff5b13477547f1dcab7856ae63.tar.bz2 spark-d000ca98a80986ff5b13477547f1dcab7856ae63.zip |
[SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.
In cases like `Limit` and `TakeOrdered`, `executeCollect()` makes optimizations that `execute().collect()` will not.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #939 from liancheng/spark-1958 and squashes the following commits:
bdc4a14 [Cheng Lian] Copy rows to present immutable data to users
8250976 [Cheng Lian] Added return type explicitly for public API
192a25c [Cheng Lian] [SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.
Diffstat (limited to 'sql')
3 files changed, 8 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index e855f36256..8855c4e876 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -369,6 +369,12 @@ class SchemaRDD( } // ======================================================================= + // Overriden RDD actions + // ======================================================================= + + override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() + + // ======================================================================= // Base RDD functions that do NOT change schema // ======================================================================= diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 235a9b1692..4613df1039 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { /** * Runs this query returning the result as an array. */ - def executeCollect(): Array[Row] = execute().collect() + def executeCollect(): Array[Row] = execute().map(_.copy()).collect() protected def buildRow(values: Seq[Any]): Row = new GenericRow(values.toArray) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index f9731e82e4..b973ceba5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -201,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { } test("insert (appending) to same table via Scala API") { - sql("INSERT INTO testsource SELECT * FROM testsource").collect() + sql("INSERT INTO testsource SELECT * FROM testsource") val double_rdd = sql("SELECT * FROM testsource").collect() assert(double_rdd != null) assert(double_rdd.size === 30) |