aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-10-08 18:50:27 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-08 18:50:27 -0700
commitfa3e4d8f52995bf632e7eda60dbb776c9f637546 (patch)
tree64f0137f06e65af60606d33f1a5096f58029e237 /external
parent8e67882b905683a1f151679214ef0b575e77c7e1 (diff)
downloadspark-fa3e4d8f52995bf632e7eda60dbb776c9f637546.tar.gz
spark-fa3e4d8f52995bf632e7eda60dbb776c9f637546.tar.bz2
spark-fa3e4d8f52995bf632e7eda60dbb776c9f637546.zip
[SPARK-11019] [STREAMING] [FLUME] Gracefully shutdown Flume receiver th…
…reads. Wait for a minute for the receiver threads to shutdown before interrupting them. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #9041 from harishreedharan/flume-graceful-shutdown.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala8
1 files changed, 6 insertions, 2 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 3b936d88ab..6737750c3d 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
-import java.util.concurrent.{LinkedBlockingQueue, Executors}
+import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -93,7 +93,11 @@ private[streaming] class FlumePollingReceiver(
override def onStop(): Unit = {
logInfo("Shutting down Flume Polling Receiver")
- receiverExecutor.shutdownNow()
+ receiverExecutor.shutdown()
+ // Wait upto a minute for the threads to die
+ if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ receiverExecutor.shutdownNow()
+ }
connections.asScala.foreach(_.transceiver.close())
channelFactory.releaseExternalResources()
}