diff options
author | zsxwing <zsxwing@gmail.com> | 2015-04-22 11:08:59 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-22 11:08:59 -0700 |
commit | 33b85620f910c404873d362d27cca1223084913a (patch) | |
tree | da1003de30e34743e99fe0ffc38a8d7d8ba009d6 /external | |
parent | bdc5c16e76c5d0bc147408353b2ba4faa8e914fc (diff) | |
download | spark-33b85620f910c404873d362d27cca1223084913a.tar.gz spark-33b85620f910c404873d362d27cca1223084913a.tar.bz2 spark-33b85620f910c404873d362d27cca1223084913a.zip |
[SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils
As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176
What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time.
Author: zsxwing <zsxwing@gmail.com>
Closes #5631 from zsxwing/thread-utils and squashes the following commits:
9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala | 5 | ||||
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala | 4 |
2 files changed, 5 insertions, 4 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 4d26b640e8..cca0fac023 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.util.Utils +import org.apache.spark.util.ThreadUtils /** * Input stream that pulls messages from a Kafka Broker. @@ -111,7 +111,8 @@ class KafkaReceiver[ val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) - val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") + val executorPool = + ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") try { // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index c4a44c1822..ea87e96037 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -33,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} -import org.apache.spark.util.Utils +import org.apache.spark.util.ThreadUtils /** * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss. @@ -121,7 +121,7 @@ class ReliableKafkaReceiver[ zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) - messageHandlerThreadPool = Utils.newDaemonFixedThreadPool( + messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( topics.values.sum, "KafkaMessageHandler") blockGenerator.start() |