From 33b85620f910c404873d362d27cca1223084913a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 22 Apr 2015 11:08:59 -0700 Subject: [SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176 What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time. Author: zsxwing Closes #5631 from zsxwing/thread-utils and squashes the following commits: 9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils --- .../scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala | 5 +++-- .../org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'external') diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 4d26b640e8..cca0fac023 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.util.Utils +import org.apache.spark.util.ThreadUtils /** * Input stream that pulls messages from a Kafka Broker. @@ -111,7 +111,8 @@ class KafkaReceiver[ val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) - val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") + val executorPool = + ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") try { // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index c4a44c1822..ea87e96037 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -33,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} -import org.apache.spark.util.Utils +import org.apache.spark.util.ThreadUtils /** * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss. @@ -121,7 +121,7 @@ class ReliableKafkaReceiver[ zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) - messageHandlerThreadPool = Utils.newDaemonFixedThreadPool( + messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( topics.values.sum, "KafkaMessageHandler") blockGenerator.start() -- cgit v1.2.3