aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala136
1 files changed, 0 insertions, 136 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index d361f61764..34fa626e00 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -120,7 +120,6 @@ abstract class QueryTest extends PlanTest {
throw ae
}
}
- checkJsonFormat(analyzedDS)
assertEmptyMissingInput(analyzedDS)
try ds.collect() catch {
@@ -168,8 +167,6 @@ abstract class QueryTest extends PlanTest {
}
}
- checkJsonFormat(analyzedDF)
-
assertEmptyMissingInput(analyzedDF)
QueryTest.checkAnswer(analyzedDF, expectedAnswer) match {
@@ -228,139 +225,6 @@ abstract class QueryTest extends PlanTest {
planWithCaching)
}
- private def checkJsonFormat(ds: Dataset[_]): Unit = {
- // Get the analyzed plan and rewrite the PredicateSubqueries in order to make sure that
- // RDD and Data resolution does not break.
- val logicalPlan = ds.queryExecution.analyzed
-
- // bypass some cases that we can't handle currently.
- logicalPlan.transform {
- case _: ObjectConsumer => return
- case _: ObjectProducer => return
- case _: AppendColumns => return
- case _: TypedFilter => return
- case _: LogicalRelation => return
- case p if p.getClass.getSimpleName == "MetastoreRelation" => return
- case _: MemoryPlan => return
- case p: InMemoryRelation =>
- p.child.transform {
- case _: ObjectConsumerExec => return
- case _: ObjectProducerExec => return
- }
- p
- }.transformAllExpressions {
- case _: ImperativeAggregate => return
- case _: TypedAggregateExpression => return
- case Literal(_, _: ObjectType) => return
- case _: UserDefinedGenerator => return
- }
-
- // bypass hive tests before we fix all corner cases in hive module.
- if (this.getClass.getName.startsWith("org.apache.spark.sql.hive")) return
-
- val jsonString = try {
- logicalPlan.toJSON
- } catch {
- case NonFatal(e) =>
- fail(
- s"""
- |Failed to parse logical plan to JSON:
- |${logicalPlan.treeString}
- """.stripMargin, e)
- }
-
- // scala function is not serializable to JSON, use null to replace them so that we can compare
- // the plans later.
- val normalized1 = logicalPlan.transformAllExpressions {
- case udf: ScalaUDF => udf.copy(function = null)
- case gen: UserDefinedGenerator => gen.copy(function = null)
- // After SPARK-17356: the JSON representation no longer has the Metadata. We need to remove
- // the Metadata from the normalized plan so that we can compare this plan with the
- // JSON-deserialzed plan.
- case a @ Alias(child, name) if a.explicitMetadata.isDefined =>
- Alias(child, name)(a.exprId, a.qualifier, Some(Metadata.empty), a.isGenerated)
- case a: AttributeReference if a.metadata != Metadata.empty =>
- AttributeReference(a.name, a.dataType, a.nullable, Metadata.empty)(a.exprId, a.qualifier,
- a.isGenerated)
- }
-
- // RDDs/data are not serializable to JSON, so we need to collect LogicalPlans that contains
- // these non-serializable stuff, and use these original ones to replace the null-placeholders
- // in the logical plans parsed from JSON.
- val logicalRDDs = new ArrayDeque[LogicalRDD]()
- val localRelations = new ArrayDeque[LocalRelation]()
- val inMemoryRelations = new ArrayDeque[InMemoryRelation]()
- def collectData: (LogicalPlan => Unit) = {
- case l: LogicalRDD =>
- logicalRDDs.offer(l)
- case l: LocalRelation =>
- localRelations.offer(l)
- case i: InMemoryRelation =>
- inMemoryRelations.offer(i)
- case p =>
- p.expressions.foreach {
- _.foreach {
- case s: SubqueryExpression =>
- s.plan.foreach(collectData)
- case _ =>
- }
- }
- }
- logicalPlan.foreach(collectData)
-
-
- val jsonBackPlan = try {
- TreeNode.fromJSON[LogicalPlan](jsonString, spark.sparkContext)
- } catch {
- case NonFatal(e) =>
- fail(
- s"""
- |Failed to rebuild the logical plan from JSON:
- |${logicalPlan.treeString}
- |
- |${logicalPlan.prettyJson}
- """.stripMargin, e)
- }
-
- def renormalize: PartialFunction[LogicalPlan, LogicalPlan] = {
- case l: LogicalRDD =>
- val origin = logicalRDDs.pop()
- LogicalRDD(l.output, origin.rdd)(spark)
- case l: LocalRelation =>
- val origin = localRelations.pop()
- l.copy(data = origin.data)
- case l: InMemoryRelation =>
- val origin = inMemoryRelations.pop()
- InMemoryRelation(
- l.output,
- l.useCompression,
- l.batchSize,
- l.storageLevel,
- origin.child,
- l.tableName)(
- origin.cachedColumnBuffers,
- origin.batchStats)
- case p =>
- p.transformExpressions {
- case s: SubqueryExpression =>
- s.withNewPlan(s.plan.transformDown(renormalize))
- }
- }
- val normalized2 = jsonBackPlan.transformDown(renormalize)
-
- assert(logicalRDDs.isEmpty)
- assert(localRelations.isEmpty)
- assert(inMemoryRelations.isEmpty)
-
- if (normalized1 != normalized2) {
- fail(
- s"""
- |== FAIL: the logical plan parsed from json does not match the original one ===
- |${sideBySide(logicalPlan.treeString, normalized2.treeString).mkString("\n")}
- """.stripMargin)
- }
- }
-
/**
* Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans.
*/