aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorcarlmartin <carlmartinmax@gmail.com>2014-12-17 12:24:03 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-17 12:24:03 -0800
commit4782def094fc5b5030a944290d2301f887f77a02 (patch)
treebce13b18100ebf8ab6e8209aa9f4509a35e88a9a /sql/hive-thriftserver
parent5fdcbdc0c9f91be9380b09643a5db0f96c673ce8 (diff)
downloadspark-4782def094fc5b5030a944290d2301f887f77a02.tar.gz
spark-4782def094fc5b5030a944290d2301f887f77a02.tar.bz2
spark-4782def094fc5b5030a944290d2301f887f77a02.zip
[SPARK-4694]Fix HiveThriftServer2 cann't stop In Yarn HA mode.
HiveThriftServer2 can not exit automactic when changing the standy resource manager in Yarn HA mode. The scheduler backend was aware of the AM had been exited so it call sc.stop to exit the driver process but there was a user thread(HiveThriftServer2 ) which was still alive and cause this problem. To fix it, make a demo thread to detect the sparkContext is null or not.If the sc is stopped, call the ThriftServer.stop to stop the user thread. Author: carlmartin <carlmartinmax@gmail.com> Closes #3576 from SaintBacchus/ThriftServer2ExitBug and squashes the following commits: 2890b4a [carlmartin] Use SparkListener instead of the demo thread to stop the hive server. c15da0e [carlmartin] HiveThriftServer2 can not exit automactic when changing the standy resource manager in Yarn HA mode
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala14
1 files changed, 13 insertions, 1 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index c5b73234fa..6e07df18b0 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -27,6 +27,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
@@ -44,9 +45,9 @@ object HiveThriftServer2 extends Logging {
val server = new HiveThriftServer2(sqlContext)
server.init(sqlContext.hiveconf)
server.start()
+ sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
}
-
def main(args: Array[String]) {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
if (!optionsProcessor.process(args)) {
@@ -69,12 +70,23 @@ object HiveThriftServer2 extends Logging {
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
+ SparkSQLEnv.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
}
}
+
+ /**
+ * A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
+ */
+ class HiveThriftServer2Listener(val server: HiveServer2) extends SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ server.stop()
+ }
+ }
+
}
private[hive] class HiveThriftServer2(hiveContext: HiveContext)