aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-22 11:08:59 -0700
committerReynold Xin <rxin@databricks.com>2015-04-22 11:08:59 -0700
commit33b85620f910c404873d362d27cca1223084913a (patch)
treeda1003de30e34743e99fe0ffc38a8d7d8ba009d6 /external
parentbdc5c16e76c5d0bc147408353b2ba4faa8e914fc (diff)
downloadspark-33b85620f910c404873d362d27cca1223084913a.tar.gz
spark-33b85620f910c404873d362d27cca1223084913a.tar.bz2
spark-33b85620f910c404873d362d27cca1223084913a.zip
[SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils
As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176 What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time. Author: zsxwing <zsxwing@gmail.com> Closes #5631 from zsxwing/thread-utils and squashes the following commits: 9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala4
2 files changed, 5 insertions, 4 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 4d26b640e8..cca0fac023 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
/**
* Input stream that pulls messages from a Kafka Broker.
@@ -111,7 +111,8 @@ class KafkaReceiver[
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
- val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
+ val executorPool =
+ ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index c4a44c1822..ea87e96037 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -33,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
/**
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
@@ -121,7 +121,7 @@ class ReliableKafkaReceiver[
zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
- messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
+ messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
topics.values.sum, "KafkaMessageHandler")
blockGenerator.start()