aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-09-16 19:37:30 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-16 19:37:30 +0800
commita425a37a5d894e0d7462c8faa81a913495189ece (patch)
tree47d5575d92f084993cacbe4caf87e7e5ed19fdd2 /sql/core
parentfc1efb720c9c0033077c3c20ee144d0f757e6bcd (diff)
downloadspark-a425a37a5d894e0d7462c8faa81a913495189ece.tar.gz
spark-a425a37a5d894e0d7462c8faa81a913495189ece.tar.bz2
spark-a425a37a5d894e0d7462c8faa81a913495189ece.zip
[SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid OOM when converting unknown fields to JSON
## What changes were proposed in this pull request? This PR is a follow up of SPARK-17356. Current implementation of `TreeNode.toJSON` recursively converts all fields of TreeNode to JSON, even if the field is of type `Seq` or type Map. This may trigger out of memory exception in cases like: 1. the Seq or Map can be very big. Converting them to JSON may take huge memory, which may trigger out of memory error. 2. Some user space input may also be propagated to the Plan. The user space input can be of arbitrary type, and may also be self-referencing. Trying to print user space input to JSON may trigger out of memory error or stack overflow error. For a code example, please check the Jira description of SPARK-17426. In this PR, we refactor the `TreeNode.toJSON` so that we only convert a field to JSON string if the field is a safe type. ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #14990 from clockfly/json_oom2.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala136
1 files changed, 0 insertions, 136 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index d361f61764..34fa626e00 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -120,7 +120,6 @@ abstract class QueryTest extends PlanTest {
throw ae
}
}
- checkJsonFormat(analyzedDS)
assertEmptyMissingInput(analyzedDS)
try ds.collect() catch {
@@ -168,8 +167,6 @@ abstract class QueryTest extends PlanTest {
}
}
- checkJsonFormat(analyzedDF)
-
assertEmptyMissingInput(analyzedDF)
QueryTest.checkAnswer(analyzedDF, expectedAnswer) match {
@@ -228,139 +225,6 @@ abstract class QueryTest extends PlanTest {
planWithCaching)
}
- private def checkJsonFormat(ds: Dataset[_]): Unit = {
- // Get the analyzed plan and rewrite the PredicateSubqueries in order to make sure that
- // RDD and Data resolution does not break.
- val logicalPlan = ds.queryExecution.analyzed
-
- // bypass some cases that we can't handle currently.
- logicalPlan.transform {
- case _: ObjectConsumer => return
- case _: ObjectProducer => return
- case _: AppendColumns => return
- case _: TypedFilter => return
- case _: LogicalRelation => return
- case p if p.getClass.getSimpleName == "MetastoreRelation" => return
- case _: MemoryPlan => return
- case p: InMemoryRelation =>
- p.child.transform {
- case _: ObjectConsumerExec => return
- case _: ObjectProducerExec => return
- }
- p
- }.transformAllExpressions {
- case _: ImperativeAggregate => return
- case _: TypedAggregateExpression => return
- case Literal(_, _: ObjectType) => return
- case _: UserDefinedGenerator => return
- }
-
- // bypass hive tests before we fix all corner cases in hive module.
- if (this.getClass.getName.startsWith("org.apache.spark.sql.hive")) return
-
- val jsonString = try {
- logicalPlan.toJSON
- } catch {
- case NonFatal(e) =>
- fail(
- s"""
- |Failed to parse logical plan to JSON:
- |${logicalPlan.treeString}
- """.stripMargin, e)
- }
-
- // scala function is not serializable to JSON, use null to replace them so that we can compare
- // the plans later.
- val normalized1 = logicalPlan.transformAllExpressions {
- case udf: ScalaUDF => udf.copy(function = null)
- case gen: UserDefinedGenerator => gen.copy(function = null)
- // After SPARK-17356: the JSON representation no longer has the Metadata. We need to remove
- // the Metadata from the normalized plan so that we can compare this plan with the
- // JSON-deserialzed plan.
- case a @ Alias(child, name) if a.explicitMetadata.isDefined =>
- Alias(child, name)(a.exprId, a.qualifier, Some(Metadata.empty), a.isGenerated)
- case a: AttributeReference if a.metadata != Metadata.empty =>
- AttributeReference(a.name, a.dataType, a.nullable, Metadata.empty)(a.exprId, a.qualifier,
- a.isGenerated)
- }
-
- // RDDs/data are not serializable to JSON, so we need to collect LogicalPlans that contains
- // these non-serializable stuff, and use these original ones to replace the null-placeholders
- // in the logical plans parsed from JSON.
- val logicalRDDs = new ArrayDeque[LogicalRDD]()
- val localRelations = new ArrayDeque[LocalRelation]()
- val inMemoryRelations = new ArrayDeque[InMemoryRelation]()
- def collectData: (LogicalPlan => Unit) = {
- case l: LogicalRDD =>
- logicalRDDs.offer(l)
- case l: LocalRelation =>
- localRelations.offer(l)
- case i: InMemoryRelation =>
- inMemoryRelations.offer(i)
- case p =>
- p.expressions.foreach {
- _.foreach {
- case s: SubqueryExpression =>
- s.plan.foreach(collectData)
- case _ =>
- }
- }
- }
- logicalPlan.foreach(collectData)
-
-
- val jsonBackPlan = try {
- TreeNode.fromJSON[LogicalPlan](jsonString, spark.sparkContext)
- } catch {
- case NonFatal(e) =>
- fail(
- s"""
- |Failed to rebuild the logical plan from JSON:
- |${logicalPlan.treeString}
- |
- |${logicalPlan.prettyJson}
- """.stripMargin, e)
- }
-
- def renormalize: PartialFunction[LogicalPlan, LogicalPlan] = {
- case l: LogicalRDD =>
- val origin = logicalRDDs.pop()
- LogicalRDD(l.output, origin.rdd)(spark)
- case l: LocalRelation =>
- val origin = localRelations.pop()
- l.copy(data = origin.data)
- case l: InMemoryRelation =>
- val origin = inMemoryRelations.pop()
- InMemoryRelation(
- l.output,
- l.useCompression,
- l.batchSize,
- l.storageLevel,
- origin.child,
- l.tableName)(
- origin.cachedColumnBuffers,
- origin.batchStats)
- case p =>
- p.transformExpressions {
- case s: SubqueryExpression =>
- s.withNewPlan(s.plan.transformDown(renormalize))
- }
- }
- val normalized2 = jsonBackPlan.transformDown(renormalize)
-
- assert(logicalRDDs.isEmpty)
- assert(localRelations.isEmpty)
- assert(inMemoryRelations.isEmpty)
-
- if (normalized1 != normalized2) {
- fail(
- s"""
- |== FAIL: the logical plan parsed from json does not match the original one ===
- |${sideBySide(logicalPlan.treeString, normalized2.treeString).mkString("\n")}
- """.stripMargin)
- }
- }
-
/**
* Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans.
*/