diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-04-30 20:28:22 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-30 20:28:22 -0700 |
commit | 90787de864b58a1079c23e6581381ca8ffe7685f (patch) | |
tree | b709ad017b9fd13217008f54d7ece291958ce84c /sql | |
parent | 507bea5ca6d95c995f8152b8473713c136e23754 (diff) | |
download | spark-90787de864b58a1079c23e6581381ca8ffe7685f.tar.gz spark-90787de864b58a1079c23e6581381ca8ffe7685f.tar.bz2 spark-90787de864b58a1079c23e6581381ca8ffe7685f.zip |
[SPARK-15033][SQL] fix a flaky test in CachedTableSuite
## What changes were proposed in this pull request?
This is caused by https://github.com/apache/spark/pull/12776, which removes the `synchronized` from all methods in `AccumulatorContext`.
However, a test in `CachedTableSuite` synchronize on `AccumulatorContext` and expecting no one else can change it, which is not true anymore.
This PR update that test to not require to lock on `AccumulatorContext`.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12811 from cloud-fan/flaky.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala | 2 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala | 19 |
2 files changed, 14 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 577c34ba61..94b87a5812 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -68,7 +68,7 @@ private[sql] case class InMemoryRelation( override def producedAttributes: AttributeSet = outputSet - private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = + private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = if (_batchStats == null) { child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 1095a73c58..12dbed89d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -333,12 +333,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("SELECT * FROM t1").count() sql("SELECT * FROM t2").count() - AccumulatorContext.synchronized { - val accsSize = AccumulatorContext.numAccums - sqlContext.uncacheTable("t1") - sqlContext.uncacheTable("t2") - assert((accsSize - 2) == AccumulatorContext.numAccums) - } + val accId1 = sqlContext.table("t1").queryExecution.withCachedData.collect { + case i: InMemoryRelation => i.batchStats.id + }.head + + val accId2 = sqlContext.table("t1").queryExecution.withCachedData.collect { + case i: InMemoryRelation => i.batchStats.id + }.head + + sqlContext.uncacheTable("t1") + sqlContext.uncacheTable("t2") + + assert(AccumulatorContext.get(accId1).isEmpty) + assert(AccumulatorContext.get(accId2).isEmpty) } test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") { |