aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-09-16 14:02:56 -0700
committerYin Huai <yhuai@databricks.com>2016-09-16 14:02:56 -0700
commit39e2bad6a866d27c3ca594d15e574a1da3ee84cc (patch)
tree7dd042d22e2ffc5fa3d916d92d55a82322892dfc /sql/core/src/test/scala
parentb9323fc9381a09af510f542fd5c86473e029caf6 (diff)
downloadspark-39e2bad6a866d27c3ca594d15e574a1da3ee84cc.tar.gz
spark-39e2bad6a866d27c3ca594d15e574a1da3ee84cc.tar.bz2
spark-39e2bad6a866d27c3ca594d15e574a1da3ee84cc.zip
[SPARK-17549][SQL] Only collect table size stat in driver for cached relation.
The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. On a mildly related change, I'm also adding code to catch exceptions in the code generator since Janino was breaking with the test data I tried this patch on. Tested with unit tests and by doing a count a very wide table (20k columns) with many partitions. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15112 from vanzin/SPARK-17549.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala14
1 files changed, 14 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 937839644a..0daa29b666 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -232,4 +232,18 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}
+
+ test("SPARK-17549: cached table size should be correctly calculated") {
+ val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
+ val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
+ val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
+
+ // Materialize the data.
+ val expectedAnswer = data.collect()
+ checkAnswer(cached, expectedAnswer)
+
+ // Check that the right size was calculated.
+ assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
+ }
+
}