aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala29
1 files changed, 15 insertions, 14 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index a1b45ca7eb..826862835a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -28,9 +28,11 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.{LogicalRDD, Queryable}
+import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.streaming.MemoryPlan
+import org.apache.spark.sql.types.ObjectType
abstract class QueryTest extends PlanTest {
@@ -90,7 +92,7 @@ abstract class QueryTest extends PlanTest {
s"""
|Exception collecting dataset as objects
|${ds.resolvedTEncoder}
- |${ds.resolvedTEncoder.fromRowExpression.treeString}
+ |${ds.resolvedTEncoder.deserializer.treeString}
|${ds.queryExecution}
""".stripMargin, e)
}
@@ -105,11 +107,11 @@ abstract class QueryTest extends PlanTest {
val expected = expectedAnswer.toSet.toSeq.map((a: Any) => a.toString).sorted
val actual = decoded.toSet.toSeq.map((a: Any) => a.toString).sorted
- val comparision = sideBySide("expected" +: expected, "spark" +: actual).mkString("\n")
+ val comparison = sideBySide("expected" +: expected, "spark" +: actual).mkString("\n")
fail(
s"""Decoded objects do not match expected objects:
- |$comparision
- |${ds.resolvedTEncoder.fromRowExpression.treeString}
+ |$comparison
+ |${ds.resolvedTEncoder.deserializer.treeString}
""".stripMargin)
}
}
@@ -180,9 +182,9 @@ abstract class QueryTest extends PlanTest {
}
/**
- * Asserts that a given [[Queryable]] will be executed using the given number of cached results.
+ * Asserts that a given [[Dataset]] will be executed using the given number of cached results.
*/
- def assertCached(query: Queryable, numCachedTables: Int = 1): Unit = {
+ def assertCached(query: Dataset[_], numCachedTables: Int = 1): Unit = {
val planWithCaching = query.queryExecution.withCachedData
val cachedData = planWithCaching collect {
case cached: InMemoryRelation => cached
@@ -198,13 +200,12 @@ abstract class QueryTest extends PlanTest {
val logicalPlan = df.queryExecution.analyzed
// bypass some cases that we can't handle currently.
logicalPlan.transform {
- case _: MapPartitions => return
- case _: MapGroups => return
- case _: AppendColumns => return
- case _: CoGroup => return
+ case _: ObjectOperator => return
case _: LogicalRelation => return
+ case _: MemoryPlan => return
}.transformAllExpressions {
case a: ImperativeAggregate => return
+ case Literal(_, _: ObjectType) => return
}
// bypass hive tests before we fix all corner cases in hive module.
@@ -286,9 +287,9 @@ abstract class QueryTest extends PlanTest {
}
/**
- * Asserts that a given [[Queryable]] does not have missing inputs in all the analyzed plans.
- */
- def assertEmptyMissingInput(query: Queryable): Unit = {
+ * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans.
+ */
+ def assertEmptyMissingInput(query: Dataset[_]): Unit = {
assert(query.queryExecution.analyzed.missingInput.isEmpty,
s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}")
assert(query.queryExecution.optimizedPlan.missingInput.isEmpty,