aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-30 20:28:22 -0700
committerReynold Xin <rxin@databricks.com>2016-04-30 20:28:22 -0700
commit90787de864b58a1079c23e6581381ca8ffe7685f (patch)
treeb709ad017b9fd13217008f54d7ece291958ce84c /sql
parent507bea5ca6d95c995f8152b8473713c136e23754 (diff)
downloadspark-90787de864b58a1079c23e6581381ca8ffe7685f.tar.gz
spark-90787de864b58a1079c23e6581381ca8ffe7685f.tar.bz2
spark-90787de864b58a1079c23e6581381ca8ffe7685f.zip
[SPARK-15033][SQL] fix a flaky test in CachedTableSuite
## What changes were proposed in this pull request? This is caused by https://github.com/apache/spark/pull/12776, which removes the `synchronized` from all methods in `AccumulatorContext`. However, a test in `CachedTableSuite` synchronize on `AccumulatorContext` and expecting no one else can change it, which is not true anymore. This PR update that test to not require to lock on `AccumulatorContext`. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #12811 from cloud-fan/flaky.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala19
2 files changed, 14 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 577c34ba61..94b87a5812 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -68,7 +68,7 @@ private[sql] case class InMemoryRelation(
override def producedAttributes: AttributeSet = outputSet
- private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
+ private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
if (_batchStats == null) {
child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 1095a73c58..12dbed89d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -333,12 +333,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
sql("SELECT * FROM t1").count()
sql("SELECT * FROM t2").count()
- AccumulatorContext.synchronized {
- val accsSize = AccumulatorContext.numAccums
- sqlContext.uncacheTable("t1")
- sqlContext.uncacheTable("t2")
- assert((accsSize - 2) == AccumulatorContext.numAccums)
- }
+ val accId1 = sqlContext.table("t1").queryExecution.withCachedData.collect {
+ case i: InMemoryRelation => i.batchStats.id
+ }.head
+
+ val accId2 = sqlContext.table("t1").queryExecution.withCachedData.collect {
+ case i: InMemoryRelation => i.batchStats.id
+ }.head
+
+ sqlContext.uncacheTable("t1")
+ sqlContext.uncacheTable("t2")
+
+ assert(AccumulatorContext.get(accId1).isEmpty)
+ assert(AccumulatorContext.get(accId2).isEmpty)
}
test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") {