aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala4
2 files changed, 5 insertions, 4 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 4d26b640e8..cca0fac023 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
/**
* Input stream that pulls messages from a Kafka Broker.
@@ -111,7 +111,8 @@ class KafkaReceiver[
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
- val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
+ val executorPool =
+ ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index c4a44c1822..ea87e96037 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -33,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
/**
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
@@ -121,7 +121,7 @@ class ReliableKafkaReceiver[
zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
- messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
+ messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
topics.values.sum, "KafkaMessageHandler")
blockGenerator.start()