diff options
author | Hari Shreedharan <hshreedharan@apache.org> | 2015-10-08 18:50:27 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-10-08 18:50:27 -0700 |
commit | fa3e4d8f52995bf632e7eda60dbb776c9f637546 (patch) | |
tree | 64f0137f06e65af60606d33f1a5096f58029e237 /external/flume/src/main | |
parent | 8e67882b905683a1f151679214ef0b575e77c7e1 (diff) | |
download | spark-fa3e4d8f52995bf632e7eda60dbb776c9f637546.tar.gz spark-fa3e4d8f52995bf632e7eda60dbb776c9f637546.tar.bz2 spark-fa3e4d8f52995bf632e7eda60dbb776c9f637546.zip |
[SPARK-11019] [STREAMING] [FLUME] Gracefully shutdown Flume receiver th…
…reads.
Wait for a minute for the receiver threads to shutdown before interrupting them.
Author: Hari Shreedharan <hshreedharan@apache.org>
Closes #9041 from harishreedharan/flume-graceful-shutdown.
Diffstat (limited to 'external/flume/src/main')
-rw-r--r-- | external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 3b936d88ab..6737750c3d 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress -import java.util.concurrent.{LinkedBlockingQueue, Executors} +import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -93,7 +93,11 @@ private[streaming] class FlumePollingReceiver( override def onStop(): Unit = { logInfo("Shutting down Flume Polling Receiver") - receiverExecutor.shutdownNow() + receiverExecutor.shutdown() + // Wait upto a minute for the threads to die + if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + receiverExecutor.shutdownNow() + } connections.asScala.foreach(_.transceiver.close()) channelFactory.releaseExternalResources() } |