diff options
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 29 |
1 files changed, 15 insertions, 14 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index a1b45ca7eb..826862835a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -28,9 +28,11 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.{LogicalRDD, Queryable} +import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.streaming.MemoryPlan +import org.apache.spark.sql.types.ObjectType abstract class QueryTest extends PlanTest { @@ -90,7 +92,7 @@ abstract class QueryTest extends PlanTest { s""" |Exception collecting dataset as objects |${ds.resolvedTEncoder} - |${ds.resolvedTEncoder.fromRowExpression.treeString} + |${ds.resolvedTEncoder.deserializer.treeString} |${ds.queryExecution} """.stripMargin, e) } @@ -105,11 +107,11 @@ abstract class QueryTest extends PlanTest { val expected = expectedAnswer.toSet.toSeq.map((a: Any) => a.toString).sorted val actual = decoded.toSet.toSeq.map((a: Any) => a.toString).sorted - val comparision = sideBySide("expected" +: expected, "spark" +: actual).mkString("\n") + val comparison = sideBySide("expected" +: expected, "spark" +: actual).mkString("\n") fail( s"""Decoded objects do not match expected objects: - |$comparision - |${ds.resolvedTEncoder.fromRowExpression.treeString} + |$comparison + |${ds.resolvedTEncoder.deserializer.treeString} """.stripMargin) } } @@ -180,9 +182,9 @@ abstract class QueryTest extends PlanTest { } /** - * Asserts that a given [[Queryable]] will be executed using the given number of cached results. + * Asserts that a given [[Dataset]] will be executed using the given number of cached results. */ - def assertCached(query: Queryable, numCachedTables: Int = 1): Unit = { + def assertCached(query: Dataset[_], numCachedTables: Int = 1): Unit = { val planWithCaching = query.queryExecution.withCachedData val cachedData = planWithCaching collect { case cached: InMemoryRelation => cached @@ -198,13 +200,12 @@ abstract class QueryTest extends PlanTest { val logicalPlan = df.queryExecution.analyzed // bypass some cases that we can't handle currently. logicalPlan.transform { - case _: MapPartitions => return - case _: MapGroups => return - case _: AppendColumns => return - case _: CoGroup => return + case _: ObjectOperator => return case _: LogicalRelation => return + case _: MemoryPlan => return }.transformAllExpressions { case a: ImperativeAggregate => return + case Literal(_, _: ObjectType) => return } // bypass hive tests before we fix all corner cases in hive module. @@ -286,9 +287,9 @@ abstract class QueryTest extends PlanTest { } /** - * Asserts that a given [[Queryable]] does not have missing inputs in all the analyzed plans. - */ - def assertEmptyMissingInput(query: Queryable): Unit = { + * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans. + */ + def assertEmptyMissingInput(query: Dataset[_]): Unit = { assert(query.queryExecution.analyzed.missingInput.isEmpty, s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}") assert(query.queryExecution.optimizedPlan.missingInput.isEmpty, |