aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala14
1 files changed, 13 insertions, 1 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index c5b73234fa..6e07df18b0 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -27,6 +27,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
@@ -44,9 +45,9 @@ object HiveThriftServer2 extends Logging {
val server = new HiveThriftServer2(sqlContext)
server.init(sqlContext.hiveconf)
server.start()
+ sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
}
-
def main(args: Array[String]) {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
if (!optionsProcessor.process(args)) {
@@ -69,12 +70,23 @@ object HiveThriftServer2 extends Logging {
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
+ SparkSQLEnv.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
}
}
+
+ /**
+ * A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
+ */
+ class HiveThriftServer2Listener(val server: HiveServer2) extends SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ server.stop()
+ }
+ }
+
}
private[hive] class HiveThriftServer2(hiveContext: HiveContext)