aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-08-11 22:46:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-08-11 22:46:59 -0700
commitb1581ac28840a4d2209ef8bb5c9f8700b4c1b286 (patch)
treee8ec89389a2da7b9f902f9ad57924ed712816930 /sql
parentc3e9a120e33159fb45cd99f3a55fc5cf16cd7c6c (diff)
downloadspark-b1581ac28840a4d2209ef8bb5c9f8700b4c1b286.tar.gz
spark-b1581ac28840a4d2209ef8bb5c9f8700b4c1b286.tar.bz2
spark-b1581ac28840a4d2209ef8bb5c9f8700b4c1b286.zip
[SPARK-9854] [SQL] RuleExecutor.timeMap should be thread-safe
`RuleExecutor.timeMap` is currently a non-thread-safe mutable HashMap; this can lead to infinite loops if multiple threads are concurrently modifying the map. I believe that this is responsible for some hangs that I've observed in HiveQuerySuite. This patch addresses this by using a Guava `AtomicLongMap`. Author: Josh Rosen <joshrosen@databricks.com> Closes #8120 from JoshRosen/rule-executor-time-map-fix.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala15
1 files changed, 9 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 8b824511a7..f80d2a9324 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -17,22 +17,25 @@
package org.apache.spark.sql.catalyst.rules
+import scala.collection.JavaConverters._
+
+import com.google.common.util.concurrent.AtomicLongMap
+
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.sideBySide
-import scala.collection.mutable
-
object RuleExecutor {
- protected val timeMap = new mutable.HashMap[String, Long].withDefault(_ => 0)
+ protected val timeMap = AtomicLongMap.create[String]()
/** Resets statistics about time spent running specific rules */
def resetTime(): Unit = timeMap.clear()
/** Dump statistics about time spent running specific rules. */
def dumpTimeSpent(): String = {
- val maxSize = timeMap.keys.map(_.toString.length).max
- timeMap.toSeq.sortBy(_._2).reverseMap { case (k, v) =>
+ val map = timeMap.asMap().asScala
+ val maxSize = map.keys.map(_.toString.length).max
+ map.toSeq.sortBy(_._2).reverseMap { case (k, v) =>
s"${k.padTo(maxSize, " ").mkString} $v"
}.mkString("\n")
}
@@ -79,7 +82,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime
- RuleExecutor.timeMap(rule.ruleName) = RuleExecutor.timeMap(rule.ruleName) + runTime
+ RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)
if (!result.fastEquals(plan)) {
logTrace(