diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-01-04 11:16:30 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-01-04 11:16:30 -0800 |
commit | 4de9c9554ca6464b806496dbffe0ba99c0ae6b45 (patch) | |
tree | b7c68edd8aed57617163d8544c7b26d0c0eb40cc | |
parent | 2db7884f6f1939d2a62fb71279a3ad80706308e1 (diff) | |
download | spark-4de9c9554ca6464b806496dbffe0ba99c0ae6b45.tar.gz spark-4de9c9554ca6464b806496dbffe0ba99c0ae6b45.tar.bz2 spark-4de9c9554ca6464b806496dbffe0ba99c0ae6b45.zip |
Use AtomicInteger for numRunningTasks
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkEnv.scala | 19 |
1 files changed, 7 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 224b5c1744..b581c7b074 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,8 +17,9 @@ package org.apache.spark -import collection.mutable -import serializer.Serializer +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable import akka.actor._ import akka.remote.RemoteActorRefProvider @@ -60,7 +61,7 @@ class SparkEnv private[spark] ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // Number of tasks currently running across all threads - @volatile private var _numRunningTasks = 0 + private val _numRunningTasks = new AtomicInteger(0) // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). @@ -93,15 +94,9 @@ class SparkEnv private[spark] ( /** * Return the number of tasks currently running across all threads */ - def numRunningTasks: Int = _numRunningTasks - - def incrementNumRunningTasks() = synchronized { - _numRunningTasks += 1 - } - - def decrementNumRunningTasks() = synchronized { - _numRunningTasks -= 1 - } + def numRunningTasks: Int = _numRunningTasks.intValue() + def incrementNumRunningTasks(): Int = _numRunningTasks.incrementAndGet() + def decrementNumRunningTasks(): Int = _numRunningTasks.decrementAndGet() } object SparkEnv extends Logging { |