aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-22 11:08:59 -0700
committerReynold Xin <rxin@databricks.com>2015-04-22 11:08:59 -0700
commit33b85620f910c404873d362d27cca1223084913a (patch)
treeda1003de30e34743e99fe0ffc38a8d7d8ba009d6 /streaming
parentbdc5c16e76c5d0bc147408353b2ba4faa8e914fc (diff)
downloadspark-33b85620f910c404873d362d27cca1223084913a.tar.gz
spark-33b85620f910c404873d362d27cca1223084913a.tar.bz2
spark-33b85620f910c404873d362d27cca1223084913a.zip
[SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils
As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176 What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time. Author: zsxwing <zsxwing@gmail.com> Closes #5631 from zsxwing/thread-utils and squashes the following commits: 9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala4
2 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index dcdc27d29c..297bf04c0c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
@@ -150,7 +150,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
- Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
+ ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
/**
* This implementation stores the block into the block manager as well as a write ahead log.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 6bdfe45dc7..38a93cc3c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -25,7 +25,7 @@ import scala.language.postfixOps
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
import WriteAheadLogManager._
/**
@@ -60,7 +60,7 @@ private[streaming] class WriteAheadLogManager(
if (callerName.nonEmpty) s" for $callerName" else ""
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
implicit private val executionContext = ExecutionContext.fromExecutorService(
- Utils.newDaemonFixedThreadPool(1, threadpoolName))
+ ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
override protected val logName = s"WriteAheadLogManager $callerNameTag"
private var currentLogPath: Option[String] = None