aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test
diff options
context:
space:
mode:
authorxuanyuanking <xyliyuanjian@gmail.com>2016-12-19 20:31:43 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2016-12-19 20:31:43 +0100
commit24482858e05bea84cacb41c62be0a9aaa33897ee (patch)
tree3e08edcc7c8f596b7128dfb52de8f48bf02b74de /sql/hive/src/test
parent7db09abb0168b77697064c69126ee82ca89609a0 (diff)
downloadspark-24482858e05bea84cacb41c62be0a9aaa33897ee.tar.gz
spark-24482858e05bea84cacb41c62be0a9aaa33897ee.tar.bz2
spark-24482858e05bea84cacb41c62be0a9aaa33897ee.zip
[SPARK-18700][SQL] Add StripedLock for each table's relation in cache
## What changes were proposed in this pull request? As the scenario describe in [SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when cachedDataSourceTables invalided, the coming few queries will fetch all FileStatus in listLeafFiles function. In the condition of table has many partitions, these jobs will occupy much memory of driver finally may cause driver OOM. In this patch, add StripedLock for each table's relation in cache not for the whole cachedDataSourceTables, each table's load cache operation protected by it. ## How was this patch tested? Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and check it only loading once using metrics in `HiveCatalogMetrics` Author: xuanyuanking <xyliyuanjian@gmail.com> Closes #16135 from xuanyuanking/SPARK-18700.
Diffstat (limited to 'sql/hive/src/test')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala31
1 files changed, 31 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 65c02d473b..55b72c625d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
import org.scalatest.BeforeAndAfterEach
@@ -395,4 +396,34 @@ class PartitionedTablePerfStatsSuite
}
}
}
+
+ test("SPARK-18700: table loaded only once even when resolved concurrently") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+ withTable("test") {
+ withTempDir { dir =>
+ HiveCatalogMetrics.reset()
+ setupPartitionedHiveTable("test", dir, 50)
+ // select the table in multi-threads
+ val executorPool = Executors.newFixedThreadPool(10)
+ (1 to 10).map(threadId => {
+ val runnable = new Runnable {
+ override def run(): Unit = {
+ spark.sql("select * from test where partCol1 = 999").count()
+ }
+ }
+ executorPool.execute(runnable)
+ None
+ })
+ executorPool.shutdown()
+ executorPool.awaitTermination(30, TimeUnit.SECONDS)
+ // check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
+ // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
+ // only one thread can really do the build, so the listing job count is 2, the other
+ // one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
+ }
+ }
+ }
+ }
}