diff options
author | Sean Zhong <seanzhong@databricks.com> | 2016-09-06 16:05:50 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-09-06 16:05:50 +0800 |
commit | 6f13aa7dfee12b1b301bd10a1050549008ecc67e (patch) | |
tree | 67f7324e327eabf40d8a0970cd0baaea7994d666 /sql/core/src/test/scala | |
parent | c0ae6bc6ea38909730fad36e653d3c7ab0a84b44 (diff) | |
download | spark-6f13aa7dfee12b1b301bd10a1050549008ecc67e.tar.gz spark-6f13aa7dfee12b1b301bd10a1050549008ecc67e.tar.bz2 spark-6f13aa7dfee12b1b301bd10a1050549008ecc67e.zip |
[SPARK-17356][SQL] Fix out of memory issue when generating JSON for TreeNode
## What changes were proposed in this pull request?
class `org.apache.spark.sql.types.Metadata` is widely used in mllib to store some ml attributes. `Metadata` is commonly stored in `Alias` expression.
```
case class Alias(child: Expression, name: String)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifier: Option[String] = None,
val explicitMetadata: Option[Metadata] = None,
override val isGenerated: java.lang.Boolean = false)
```
The `Metadata` can take a big memory footprint since the number of attributes is big ( in scale of million). When `toJSON` is called on `Alias` expression, the `Metadata` will also be converted to a big JSON string.
If a plan contains many such kind of `Alias` expressions, it may trigger out of memory error when `toJSON` is called, since converting all `Metadata` references to JSON will take huge memory.
With this PR, we will skip scanning Metadata when doing JSON conversion. For a reproducer of the OOM, and analysis, please look at jira https://issues.apache.org/jira/browse/SPARK-17356.
## How was this patch tested?
Existing tests.
Author: Sean Zhong <seanzhong@databricks.com>
Closes #14915 from clockfly/json_oom.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index c7af40227d..d361f61764 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.streaming.MemoryPlan -import org.apache.spark.sql.types.ObjectType +import org.apache.spark.sql.types.{Metadata, ObjectType} abstract class QueryTest extends PlanTest { @@ -274,6 +274,14 @@ abstract class QueryTest extends PlanTest { val normalized1 = logicalPlan.transformAllExpressions { case udf: ScalaUDF => udf.copy(function = null) case gen: UserDefinedGenerator => gen.copy(function = null) + // After SPARK-17356: the JSON representation no longer has the Metadata. We need to remove + // the Metadata from the normalized plan so that we can compare this plan with the + // JSON-deserialzed plan. + case a @ Alias(child, name) if a.explicitMetadata.isDefined => + Alias(child, name)(a.exprId, a.qualifier, Some(Metadata.empty), a.isGenerated) + case a: AttributeReference if a.metadata != Metadata.empty => + AttributeReference(a.name, a.dataType, a.nullable, Metadata.empty)(a.exprId, a.qualifier, + a.isGenerated) } // RDDs/data are not serializable to JSON, so we need to collect LogicalPlans that contains |