aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-14 14:41:53 +0800
committerCheng Lian <lian@databricks.com>2015-08-14 14:41:53 +0800
commitc8677d73666850b37ff937520e538650632ce304 (patch)
tree071e5d36536b29ecf53dd8171e1f6ee30ab9c525 /sql/hive-thriftserver
parent7c7c7529a16c0e79778e522a3df82a0f1c3762a3 (diff)
downloadspark-c8677d73666850b37ff937520e538650632ce304.tar.gz
spark-c8677d73666850b37ff937520e538650632ce304.tar.bz2
spark-c8677d73666850b37ff937520e538650632ce304.zip
[SPARK-9958] [SQL] Make HiveThriftServer2Listener thread-safe and update the tab name to "JDBC/ODBC Server"
This PR fixed the thread-safe issue of HiveThriftServer2Listener, and also changed the tab name to "JDBC/ODBC Server" since it's conflict with the new SQL tab. <img width="1377" alt="thriftserver" src="https://cloud.githubusercontent.com/assets/1000778/9265707/c46f3f2c-4269-11e5-8d7e-888c9113ab4f.png"> Author: zsxwing <zsxwing@gmail.com> Closes #8185 from zsxwing/SPARK-9958.
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala64
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala32
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala38
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala4
4 files changed, 78 insertions, 60 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 2c9fa595b2..dd9fef9206 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -152,16 +152,26 @@ object HiveThriftServer2 extends Logging {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
server.stop()
}
- var onlineSessionNum: Int = 0
- val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
- val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
- val retainedStatements =
- conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
- val retainedSessions =
- conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
- var totalRunning = 0
-
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ private var onlineSessionNum: Int = 0
+ private val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
+ private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
+ private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
+ private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
+ private var totalRunning = 0
+
+ def getOnlineSessionNum: Int = synchronized { onlineSessionNum }
+
+ def getTotalRunning: Int = synchronized { totalRunning }
+
+ def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq }
+
+ def getSession(sessionId: String): Option[SessionInfo] = synchronized {
+ sessionList.get(sessionId)
+ }
+
+ def getExecutionList: Seq[ExecutionInfo] = synchronized { executionList.values.toSeq }
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
for {
props <- Option(jobStart.properties)
groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
@@ -173,13 +183,15 @@ object HiveThriftServer2 extends Logging {
}
def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
- val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
- sessionList.put(sessionId, info)
- onlineSessionNum += 1
- trimSessionIfNecessary()
+ synchronized {
+ val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
+ sessionList.put(sessionId, info)
+ onlineSessionNum += 1
+ trimSessionIfNecessary()
+ }
}
- def onSessionClosed(sessionId: String): Unit = {
+ def onSessionClosed(sessionId: String): Unit = synchronized {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
onlineSessionNum -= 1
trimSessionIfNecessary()
@@ -190,7 +202,7 @@ object HiveThriftServer2 extends Logging {
sessionId: String,
statement: String,
groupId: String,
- userName: String = "UNKNOWN"): Unit = {
+ userName: String = "UNKNOWN"): Unit = synchronized {
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
info.state = ExecutionState.STARTED
executionList.put(id, info)
@@ -200,27 +212,29 @@ object HiveThriftServer2 extends Logging {
totalRunning += 1
}
- def onStatementParsed(id: String, executionPlan: String): Unit = {
+ def onStatementParsed(id: String, executionPlan: String): Unit = synchronized {
executionList(id).executePlan = executionPlan
executionList(id).state = ExecutionState.COMPILED
}
def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
- executionList(id).finishTimestamp = System.currentTimeMillis
- executionList(id).detail = errorMessage
- executionList(id).state = ExecutionState.FAILED
- totalRunning -= 1
- trimExecutionIfNecessary()
+ synchronized {
+ executionList(id).finishTimestamp = System.currentTimeMillis
+ executionList(id).detail = errorMessage
+ executionList(id).state = ExecutionState.FAILED
+ totalRunning -= 1
+ trimExecutionIfNecessary()
+ }
}
- def onStatementFinish(id: String): Unit = {
+ def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
totalRunning -= 1
trimExecutionIfNecessary()
}
- private def trimExecutionIfNecessary() = synchronized {
+ private def trimExecutionIfNecessary() = {
if (executionList.size > retainedStatements) {
val toRemove = math.max(retainedStatements / 10, 1)
executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
@@ -229,7 +243,7 @@ object HiveThriftServer2 extends Logging {
}
}
- private def trimSessionIfNecessary() = synchronized {
+ private def trimSessionIfNecessary() = {
if (sessionList.size > retainedSessions) {
val toRemove = math.max(retainedSessions / 10, 1)
sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index 10c83d8b27..e990bd0601 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -39,14 +39,16 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
val content =
- generateBasicStats() ++
- <br/> ++
- <h4>
- {listener.onlineSessionNum} session(s) are online,
- running {listener.totalRunning} SQL statement(s)
- </h4> ++
- generateSessionStatsTable() ++
- generateSQLStatsTable()
+ listener.synchronized { // make sure all parts in this page are consistent
+ generateBasicStats() ++
+ <br/> ++
+ <h4>
+ {listener.getOnlineSessionNum} session(s) are online,
+ running {listener.getTotalRunning} SQL statement(s)
+ </h4> ++
+ generateSessionStatsTable() ++
+ generateSQLStatsTable()
+ }
UIUtils.headerSparkPage("JDBC/ODBC Server", content, parent, Some(5000))
}
@@ -65,11 +67,11 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
/** Generate stats of batch statements of the thrift server program */
private def generateSQLStatsTable(): Seq[Node] = {
- val numStatement = listener.executionList.size
+ val numStatement = listener.getExecutionList.size
val table = if (numStatement > 0) {
val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
"Statement", "State", "Detail")
- val dataRows = listener.executionList.values
+ val dataRows = listener.getExecutionList
def generateDataRow(info: ExecutionInfo): Seq[Node] = {
val jobLink = info.jobId.map { id: String =>
@@ -136,15 +138,15 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
/** Generate stats of batch sessions of the thrift server program */
private def generateSessionStatsTable(): Seq[Node] = {
- val numBatches = listener.sessionList.size
+ val sessionList = listener.getSessionList
+ val numBatches = sessionList.size
val table = if (numBatches > 0) {
- val dataRows =
- listener.sessionList.values
+ val dataRows = sessionList
val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
"Total Execute")
def generateDataRow(session: SessionInfo): Seq[Node] = {
- val sessionLink = "%s/sql/session?id=%s"
- .format(UIUtils.prependBaseUri(parent.basePath), session.sessionId)
+ val sessionLink = "%s/%s/session?id=%s"
+ .format(UIUtils.prependBaseUri(parent.basePath), parent.prefix, session.sessionId)
<tr>
<td> {session.userName} </td>
<td> {session.ip} </td>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
index 3b01afa603..af16cb31df 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
@@ -40,21 +40,22 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab)
def render(request: HttpServletRequest): Seq[Node] = {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
- val sessionStat = listener.sessionList.find(stat => {
- stat._1 == parameterId
- }).getOrElse(null)
- require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")
val content =
- generateBasicStats() ++
- <br/> ++
- <h4>
- User {sessionStat._2.userName},
- IP {sessionStat._2.ip},
- Session created at {formatDate(sessionStat._2.startTimestamp)},
- Total run {sessionStat._2.totalExecution} SQL
- </h4> ++
- generateSQLStatsTable(sessionStat._2.sessionId)
+ listener.synchronized { // make sure all parts in this page are consistent
+ val sessionStat = listener.getSession(parameterId).getOrElse(null)
+ require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")
+
+ generateBasicStats() ++
+ <br/> ++
+ <h4>
+ User {sessionStat.userName},
+ IP {sessionStat.ip},
+ Session created at {formatDate(sessionStat.startTimestamp)},
+ Total run {sessionStat.totalExecution} SQL
+ </h4> ++
+ generateSQLStatsTable(sessionStat.sessionId)
+ }
UIUtils.headerSparkPage("JDBC/ODBC Session", content, parent, Some(5000))
}
@@ -73,13 +74,13 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab)
/** Generate stats of batch statements of the thrift server program */
private def generateSQLStatsTable(sessionID: String): Seq[Node] = {
- val executionList = listener.executionList
- .filter(_._2.sessionId == sessionID)
+ val executionList = listener.getExecutionList
+ .filter(_.sessionId == sessionID)
val numStatement = executionList.size
val table = if (numStatement > 0) {
val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
"Statement", "State", "Detail")
- val dataRows = executionList.values.toSeq.sortBy(_.startTimestamp).reverse
+ val dataRows = executionList.sortBy(_.startTimestamp).reverse
def generateDataRow(info: ExecutionInfo): Seq[Node] = {
val jobLink = info.jobId.map { id: String =>
@@ -146,10 +147,11 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab)
/** Generate stats of batch sessions of the thrift server program */
private def generateSessionStatsTable(): Seq[Node] = {
- val numBatches = listener.sessionList.size
+ val sessionList = listener.getSessionList
+ val numBatches = sessionList.size
val table = if (numBatches > 0) {
val dataRows =
- listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map ( session =>
+ sessionList.sortBy(_.startTimestamp).reverse.map ( session =>
Seq(
session.userName,
session.ip,
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
index 94fd8a6bb6..4eabeaa673 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
@@ -27,9 +27,9 @@ import org.apache.spark.{SparkContext, Logging, SparkException}
* This assumes the given SparkContext has enabled its SparkUI.
*/
private[thriftserver] class ThriftServerTab(sparkContext: SparkContext)
- extends SparkUITab(getSparkUI(sparkContext), "sql") with Logging {
+ extends SparkUITab(getSparkUI(sparkContext), "sqlserver") with Logging {
- override val name = "SQL"
+ override val name = "JDBC/ODBC Server"
val parent = getSparkUI(sparkContext)
val listener = HiveThriftServer2.listener