diff options
author | zsxwing <zsxwing@gmail.com> | 2015-04-22 11:08:59 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-22 11:08:59 -0700 |
commit | 33b85620f910c404873d362d27cca1223084913a (patch) | |
tree | da1003de30e34743e99fe0ffc38a8d7d8ba009d6 /streaming | |
parent | bdc5c16e76c5d0bc147408353b2ba4faa8e914fc (diff) | |
download | spark-33b85620f910c404873d362d27cca1223084913a.tar.gz spark-33b85620f910c404873d362d27cca1223084913a.tar.bz2 spark-33b85620f910c404873d362d27cca1223084913a.zip |
[SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils
As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176
What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time.
Author: zsxwing <zsxwing@gmail.com>
Closes #5631 from zsxwing/thread-utils and squashes the following commits:
9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 4 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala | 4 |
2 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index dcdc27d29c..297bf04c0c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.storage._ import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager} -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{ThreadUtils, Clock, SystemClock} /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { @@ -150,7 +150,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // For processing futures used in parallel block storing into block manager and write ahead log // # threads = 2, so that both writing to BM and WAL can proceed in parallel implicit private val executionContext = ExecutionContext.fromExecutorService( - Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) + ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) /** * This implementation stores the block into the block manager as well as a write ahead log. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 6bdfe45dc7..38a93cc3c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -25,7 +25,7 @@ import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.Logging -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{ThreadUtils, Clock, SystemClock} import WriteAheadLogManager._ /** @@ -60,7 +60,7 @@ private[streaming] class WriteAheadLogManager( if (callerName.nonEmpty) s" for $callerName" else "" private val threadpoolName = s"WriteAheadLogManager $callerNameTag" implicit private val executionContext = ExecutionContext.fromExecutorService( - Utils.newDaemonFixedThreadPool(1, threadpoolName)) + ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName)) override protected val logName = s"WriteAheadLogManager $callerNameTag" private var currentLogPath: Option[String] = None |