aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala2
3 files changed, 8 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index e855f36256..8855c4e876 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -369,6 +369,12 @@ class SchemaRDD(
}
// =======================================================================
+ // Overriden RDD actions
+ // =======================================================================
+
+ override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
+
+ // =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 235a9b1692..4613df1039 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
/**
* Runs this query returning the result as an array.
*/
- def executeCollect(): Array[Row] = execute().collect()
+ def executeCollect(): Array[Row] = execute().map(_.copy()).collect()
protected def buildRow(values: Seq[Any]): Row =
new GenericRow(values.toArray)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index f9731e82e4..b973ceba5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -201,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
}
test("insert (appending) to same table via Scala API") {
- sql("INSERT INTO testsource SELECT * FROM testsource").collect()
+ sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect()
assert(double_rdd != null)
assert(double_rdd.size === 30)