diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-08-11 22:46:59 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-08-11 22:46:59 -0700 |
commit | b1581ac28840a4d2209ef8bb5c9f8700b4c1b286 (patch) | |
tree | e8ec89389a2da7b9f902f9ad57924ed712816930 /sql | |
parent | c3e9a120e33159fb45cd99f3a55fc5cf16cd7c6c (diff) | |
download | spark-b1581ac28840a4d2209ef8bb5c9f8700b4c1b286.tar.gz spark-b1581ac28840a4d2209ef8bb5c9f8700b4c1b286.tar.bz2 spark-b1581ac28840a4d2209ef8bb5c9f8700b4c1b286.zip |
[SPARK-9854] [SQL] RuleExecutor.timeMap should be thread-safe
`RuleExecutor.timeMap` is currently a non-thread-safe mutable HashMap; this can lead to infinite loops if multiple threads are concurrently modifying the map. I believe that this is responsible for some hangs that I've observed in HiveQuerySuite.
This patch addresses this by using a Guava `AtomicLongMap`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #8120 from JoshRosen/rule-executor-time-map-fix.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 8b824511a7..f80d2a9324 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -17,22 +17,25 @@ package org.apache.spark.sql.catalyst.rules +import scala.collection.JavaConverters._ + +import com.google.common.util.concurrent.AtomicLongMap + import org.apache.spark.Logging import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.sideBySide -import scala.collection.mutable - object RuleExecutor { - protected val timeMap = new mutable.HashMap[String, Long].withDefault(_ => 0) + protected val timeMap = AtomicLongMap.create[String]() /** Resets statistics about time spent running specific rules */ def resetTime(): Unit = timeMap.clear() /** Dump statistics about time spent running specific rules. */ def dumpTimeSpent(): String = { - val maxSize = timeMap.keys.map(_.toString.length).max - timeMap.toSeq.sortBy(_._2).reverseMap { case (k, v) => + val map = timeMap.asMap().asScala + val maxSize = map.keys.map(_.toString.length).max + map.toSeq.sortBy(_._2).reverseMap { case (k, v) => s"${k.padTo(maxSize, " ").mkString} $v" }.mkString("\n") } @@ -79,7 +82,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime - RuleExecutor.timeMap(rule.ruleName) = RuleExecutor.timeMap(rule.ruleName) + runTime + RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime) if (!result.fastEquals(plan)) { logTrace( |