aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authortianyi <tianyi.asiainfo@gmail.com>2015-05-04 16:59:34 +0800
committerCheng Lian <lian@databricks.com>2015-05-04 16:59:34 +0800
commit343d3bfafd449a0371feb6a88f78e07302fa7143 (patch)
tree496592af394123f470b6c85a24b92c33cdc7d4dc /sql
parent3539cb7d20f5f878132407ec3b854011b183b2ad (diff)
downloadspark-343d3bfafd449a0371feb6a88f78e07302fa7143.tar.gz
spark-343d3bfafd449a0371feb6a88f78e07302fa7143.tar.bz2
spark-343d3bfafd449a0371feb6a88f78e07302fa7143.zip
[SPARK-5100] [SQL] add webui for thriftserver
This PR is a rebased version of #3946 , and mainly focused on creating an independent tab for the thrift server in spark web UI. Features: 1. Session related statistics ( username and IP are only supported in hive-0.13.1 ) 2. List all the SQL executing or executed on this server 3. Provide links to the job generated by SQL 4. Provide link to show all SQL executing or executed in a specified session Prototype snapshots: This is the main page for thrift server ![image](https://cloud.githubusercontent.com/assets/1411869/7361379/df7dcc64-ed89-11e4-9964-4df0b32f475e.png) Author: tianyi <tianyi.asiainfo@gmail.com> Closes #5730 from tianyi/SPARK-5100 and squashes the following commits: cfd14c7 [tianyi] style fix 0efe3d5 [tianyi] revert part of pom change c0f2fa0 [tianyi] extends HiveThriftJdbcTest to start/stop thriftserver for UI test aa20408 [tianyi] fix style problem c9df6f9 [tianyi] add testsuite for thriftserver ui and fix some style issue 9830199 [tianyi] add webui for thriftserver
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala2
-rw-r--r--sql/hive-thriftserver/pom.xml12
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala161
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala190
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala197
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala50
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala12
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala105
-rw-r--r--sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala18
-rw-r--r--sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala26
10 files changed, 751 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2fa602a608..99db959a87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -52,6 +52,8 @@ private[spark] object SQLConf {
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
+ val THRIFTSERVER_UI_STATEMENT_LIMIT = "spark.sql.thriftserver.ui.retainedStatements"
+ val THRIFTSERVER_UI_SESSION_LIMIT = "spark.sql.thriftserver.ui.retainedSessions"
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index f38c796241..437f697d25 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -57,6 +57,18 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-beeline</artifactId>
</dependency>
+ <!-- Added for selenium: -->
+ <dependency>
+ <groupId>org.seleniumhq.selenium</groupId>
+ <artifactId>selenium-java</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 832596fc8b..0be5a92c25 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -22,20 +22,27 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
+import org.apache.spark.sql.SQLConf
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
+import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerApplicationEnd, SparkListener}
+import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.util.Utils
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
* `HiveThriftServer2` thrift server.
*/
object HiveThriftServer2 extends Logging {
var LOG = LogFactory.getLog(classOf[HiveServer2])
+ var uiTab: Option[ThriftServerTab] = _
+ var listener: HiveThriftServer2Listener = _
/**
* :: DeveloperApi ::
@@ -46,7 +53,13 @@ object HiveThriftServer2 extends Logging {
val server = new HiveThriftServer2(sqlContext)
server.init(sqlContext.hiveconf)
server.start()
- sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
+ listener = new HiveThriftServer2Listener(server, sqlContext.conf)
+ sqlContext.sparkContext.addSparkListener(listener)
+ uiTab = if (sqlContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
+ Some(new ThriftServerTab(sqlContext.sparkContext))
+ } else {
+ None
+ }
}
def main(args: Array[String]) {
@@ -58,14 +71,23 @@ object HiveThriftServer2 extends Logging {
logInfo("Starting SparkContext")
SparkSQLEnv.init()
- Utils.addShutdownHook { () => SparkSQLEnv.stop() }
+ Utils.addShutdownHook { () =>
+ SparkSQLEnv.stop()
+ uiTab.foreach(_.detach())
+ }
try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
- SparkSQLEnv.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
+ listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf)
+ SparkSQLEnv.sparkContext.addSparkListener(listener)
+ uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
+ Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
+ } else {
+ None
+ }
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
@@ -73,15 +95,140 @@ object HiveThriftServer2 extends Logging {
}
}
+ private[thriftserver] class SessionInfo(
+ val sessionId: String,
+ val startTimestamp: Long,
+ val ip: String,
+ val userName: String) {
+ var finishTimestamp: Long = 0L
+ var totalExecution: Int = 0
+ def totalTime: Long = {
+ if (finishTimestamp == 0L) {
+ System.currentTimeMillis - startTimestamp
+ } else {
+ finishTimestamp - startTimestamp
+ }
+ }
+ }
+
+ private[thriftserver] object ExecutionState extends Enumeration {
+ val STARTED, COMPILED, FAILED, FINISHED = Value
+ type ExecutionState = Value
+ }
+
+ private[thriftserver] class ExecutionInfo(
+ val statement: String,
+ val sessionId: String,
+ val startTimestamp: Long,
+ val userName: String) {
+ var finishTimestamp: Long = 0L
+ var executePlan: String = ""
+ var detail: String = ""
+ var state: ExecutionState.Value = ExecutionState.STARTED
+ val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
+ var groupId: String = ""
+ def totalTime: Long = {
+ if (finishTimestamp == 0L) {
+ System.currentTimeMillis - startTimestamp
+ } else {
+ finishTimestamp - startTimestamp
+ }
+ }
+ }
+
+
/**
* A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
*/
- class HiveThriftServer2Listener(val server: HiveServer2) extends SparkListener {
+ private[thriftserver] class HiveThriftServer2Listener(
+ val server: HiveServer2,
+ val conf: SQLConf) extends SparkListener {
+
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
server.stop()
}
- }
+ val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
+ val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
+ val retainedStatements =
+ conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT, "200").toInt
+ val retainedSessions =
+ conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT, "200").toInt
+ var totalRunning = 0
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ for {
+ props <- Option(jobStart.properties)
+ groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+ (_, info) <- executionList if info.groupId == groupId
+ } {
+ info.jobId += jobStart.jobId.toString
+ info.groupId = groupId
+ }
+ }
+
+ def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
+ val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
+ sessionList.put(sessionId, info)
+ trimSessionIfNecessary()
+ }
+
+ def onSessionClosed(sessionId: String): Unit = {
+ sessionList(sessionId).finishTimestamp = System.currentTimeMillis
+ }
+
+ def onStatementStart(
+ id: String,
+ sessionId: String,
+ statement: String,
+ groupId: String,
+ userName: String = "UNKNOWN"): Unit = {
+ val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
+ info.state = ExecutionState.STARTED
+ executionList.put(id, info)
+ trimExecutionIfNecessary()
+ sessionList(sessionId).totalExecution += 1
+ executionList(id).groupId = groupId
+ totalRunning += 1
+ }
+
+ def onStatementParsed(id: String, executionPlan: String): Unit = {
+ executionList(id).executePlan = executionPlan
+ executionList(id).state = ExecutionState.COMPILED
+ }
+
+ def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
+ executionList(id).finishTimestamp = System.currentTimeMillis
+ executionList(id).detail = errorMessage
+ executionList(id).state = ExecutionState.FAILED
+ totalRunning -= 1
+ }
+
+ def onStatementFinish(id: String): Unit = {
+ executionList(id).finishTimestamp = System.currentTimeMillis
+ executionList(id).state = ExecutionState.FINISHED
+ totalRunning -= 1
+ }
+
+ private def trimExecutionIfNecessary() = synchronized {
+ if (executionList.size > retainedStatements) {
+ val toRemove = math.max(retainedStatements / 10, 1)
+ executionList.take(toRemove).foreach { s =>
+ executionList.remove(s._1)
+ }
+ }
+ }
+
+ private def trimSessionIfNecessary() = synchronized {
+ if (sessionList.size > retainedSessions) {
+ val toRemove = math.max(retainedSessions / 10, 1)
+ sessionList.take(toRemove).foreach { s =>
+ sessionList.remove(s._1)
+ }
+ }
+
+ }
+ }
}
private[hive] class HiveThriftServer2(hiveContext: HiveContext)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
new file mode 100644
index 0000000000..71b16b6beb
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import java.util.Calendar
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.Logging
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{SessionInfo, ExecutionState, ExecutionInfo}
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui._
+
+
+/** Page for Spark Web UI that shows statistics of a streaming job */
+private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") with Logging {
+
+ private val listener = parent.listener
+ private val startTime = Calendar.getInstance().getTime()
+ private val emptyCell = "-"
+
+ /** Render the page */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val content =
+ generateBasicStats() ++
+ <br/> ++
+ <h4>
+ {listener.sessionList.size} session(s) are online,
+ running {listener.totalRunning} SQL statement(s)
+ </h4> ++
+ generateSessionStatsTable() ++
+ generateSQLStatsTable()
+ UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000))
+ }
+
+ /** Generate basic stats of the streaming program */
+ private def generateBasicStats(): Seq[Node] = {
+ val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+ <ul class ="unstyled">
+ <li>
+ <strong>Started at: </strong> {startTime.toString}
+ </li>
+ <li>
+ <strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
+ </li>
+ </ul>
+ }
+
+ /** Generate stats of batch statements of the thrift server program */
+ private def generateSQLStatsTable(): Seq[Node] = {
+ val numStatement = listener.executionList.size
+ val table = if (numStatement > 0) {
+ val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
+ "Statement", "State", "Detail")
+ val dataRows = listener.executionList.values
+
+ def generateDataRow(info: ExecutionInfo): Seq[Node] = {
+ val jobLink = info.jobId.map { id: String =>
+ <a href={"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), id)}>
+ [{id}]
+ </a>
+ }
+ val detail = if(info.state == ExecutionState.FAILED) info.detail else info.executePlan
+ <tr>
+ <td>{info.userName}</td>
+ <td>
+ {jobLink}
+ </td>
+ <td>{info.groupId}</td>
+ <td>{formatDate(info.startTimestamp)}</td>
+ <td>{if(info.finishTimestamp > 0) formatDate(info.finishTimestamp)}</td>
+ <td>{formatDurationOption(Some(info.totalTime))}</td>
+ <td>{info.statement}</td>
+ <td>{info.state}</td>
+ {errorMessageCell(detail)}
+ </tr>
+ }
+
+ Some(UIUtils.listingTable(headerRow, generateDataRow,
+ dataRows, false, None, Seq(null), false))
+ } else {
+ None
+ }
+
+ val content =
+ <h5 id="sqlstat">SQL Statistics</h5> ++
+ <div>
+ <ul class="unstyled">
+ {table.getOrElse("No statistics have been generated yet.")}
+ </ul>
+ </div>
+
+ content
+ }
+
+ private def errorMessageCell(errorMessage: String): Seq[Node] = {
+ val isMultiline = errorMessage.indexOf('\n') >= 0
+ val errorSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ errorMessage.substring(0, errorMessage.indexOf('\n'))
+ } else {
+ errorMessage
+ })
+ val details = if (isMultiline) {
+ // scalastyle:off
+ <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+ class="expand-details">
+ + details
+ </span> ++
+ <div class="stacktrace-details collapsed">
+ <pre>{errorMessage}</pre>
+ </div>
+ // scalastyle:on
+ } else {
+ ""
+ }
+ <td>{errorSummary}{details}</td>
+ }
+
+ /** Generate stats of batch sessions of the thrift server program */
+ private def generateSessionStatsTable(): Seq[Node] = {
+ val numBatches = listener.sessionList.size
+ val table = if (numBatches > 0) {
+ val dataRows =
+ listener.sessionList.values
+ val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
+ "Total Execute")
+ def generateDataRow(session: SessionInfo): Seq[Node] = {
+ val sessionLink = "%s/ThriftServer/session?id=%s"
+ .format(UIUtils.prependBaseUri(parent.basePath), session.sessionId)
+ <tr>
+ <td> {session.userName} </td>
+ <td> {session.ip} </td>
+ <td> <a href={sessionLink}> {session.sessionId} </a> </td>,
+ <td> {formatDate(session.startTimestamp)} </td>
+ <td> {if(session.finishTimestamp > 0) formatDate(session.finishTimestamp)} </td>
+ <td> {formatDurationOption(Some(session.totalTime))} </td>
+ <td> {session.totalExecution.toString} </td>
+ </tr>
+ }
+ Some(UIUtils.listingTable(headerRow, generateDataRow, dataRows, true, None, Seq(null), false))
+ } else {
+ None
+ }
+
+ val content =
+ <h5 id="sessionstat">Session Statistics</h5> ++
+ <div>
+ <ul class="unstyled">
+ {table.getOrElse("No statistics have been generated yet.")}
+ </ul>
+ </div>
+
+ content
+ }
+
+
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
+ private def formatDurationOption(msOption: Option[Long]): String = {
+ msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+ }
+
+ /** Generate HTML table from string data */
+ private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
+ def generateDataRow(data: Seq[String]): Seq[Node] = {
+ <tr> {data.map(d => <td>{d}</td>)} </tr>
+ }
+ UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+ }
+}
+
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
new file mode 100644
index 0000000000..33ba038ecc
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import java.util.Calendar
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.Logging
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{ExecutionInfo, ExecutionState}
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui._
+
+/** Page for Spark Web UI that shows statistics of a streaming job */
+private[ui] class ThriftServerSessionPage(parent: ThriftServerTab)
+ extends WebUIPage("session") with Logging {
+
+ private val listener = parent.listener
+ private val startTime = Calendar.getInstance().getTime()
+ private val emptyCell = "-"
+
+ /** Render the page */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val parameterId = request.getParameter("id")
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+ val sessionStat = listener.sessionList.find(stat => {
+ stat._1 == parameterId
+ }).getOrElse(null)
+ require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")
+
+ val content =
+ generateBasicStats() ++
+ <br/> ++
+ <h4>
+ User {sessionStat._2.userName},
+ IP {sessionStat._2.ip},
+ Session created at {formatDate(sessionStat._2.startTimestamp)},
+ Total run {sessionStat._2.totalExecution} SQL
+ </h4> ++
+ generateSQLStatsTable(sessionStat._2.sessionId)
+ UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000))
+ }
+
+ /** Generate basic stats of the streaming program */
+ private def generateBasicStats(): Seq[Node] = {
+ val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+ <ul class ="unstyled">
+ <li>
+ <strong>Started at: </strong> {startTime.toString}
+ </li>
+ <li>
+ <strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
+ </li>
+ </ul>
+ }
+
+ /** Generate stats of batch statements of the thrift server program */
+ private def generateSQLStatsTable(sessionID: String): Seq[Node] = {
+ val executionList = listener.executionList
+ .filter(_._2.sessionId == sessionID)
+ val numStatement = executionList.size
+ val table = if (numStatement > 0) {
+ val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
+ "Statement", "State", "Detail")
+ val dataRows = executionList.values.toSeq.sortBy(_.startTimestamp).reverse
+
+ def generateDataRow(info: ExecutionInfo): Seq[Node] = {
+ val jobLink = info.jobId.map { id: String =>
+ <a href={"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), id)}>
+ [{id}]
+ </a>
+ }
+ val detail = if(info.state == ExecutionState.FAILED) info.detail else info.executePlan
+ <tr>
+ <td>{info.userName}</td>
+ <td>
+ {jobLink}
+ </td>
+ <td>{info.groupId}</td>
+ <td>{formatDate(info.startTimestamp)}</td>
+ <td>{formatDate(info.finishTimestamp)}</td>
+ <td>{formatDurationOption(Some(info.totalTime))}</td>
+ <td>{info.statement}</td>
+ <td>{info.state}</td>
+ {errorMessageCell(detail)}
+ </tr>
+ }
+
+ Some(UIUtils.listingTable(headerRow, generateDataRow,
+ dataRows, false, None, Seq(null), false))
+ } else {
+ None
+ }
+
+ val content =
+ <h5>SQL Statistics</h5> ++
+ <div>
+ <ul class="unstyled">
+ {table.getOrElse("No statistics have been generated yet.")}
+ </ul>
+ </div>
+
+ content
+ }
+
+ private def errorMessageCell(errorMessage: String): Seq[Node] = {
+ val isMultiline = errorMessage.indexOf('\n') >= 0
+ val errorSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ errorMessage.substring(0, errorMessage.indexOf('\n'))
+ } else {
+ errorMessage
+ })
+ val details = if (isMultiline) {
+ // scalastyle:off
+ <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+ class="expand-details">
+ + details
+ </span> ++
+ <div class="stacktrace-details collapsed">
+ <pre>{errorMessage}</pre>
+ </div>
+ // scalastyle:on
+ } else {
+ ""
+ }
+ <td>{errorSummary}{details}</td>
+ }
+
+ /** Generate stats of batch sessions of the thrift server program */
+ private def generateSessionStatsTable(): Seq[Node] = {
+ val numBatches = listener.sessionList.size
+ val table = if (numBatches > 0) {
+ val dataRows =
+ listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map ( session =>
+ Seq(
+ session.userName,
+ session.ip,
+ session.sessionId,
+ formatDate(session.startTimestamp),
+ formatDate(session.finishTimestamp),
+ formatDurationOption(Some(session.totalTime)),
+ session.totalExecution.toString
+ )
+ ).toSeq
+ val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
+ "Total Execute")
+ Some(listingTable(headerRow, dataRows))
+ } else {
+ None
+ }
+
+ val content =
+ <h5>Session Statistics</h5> ++
+ <div>
+ <ul class="unstyled">
+ {table.getOrElse("No statistics have been generated yet.")}
+ </ul>
+ </div>
+
+ content
+ }
+
+
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
+ private def formatDurationOption(msOption: Option[Long]): String = {
+ msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+ }
+
+ /** Generate HTML table from string data */
+ private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
+ def generateDataRow(data: Seq[String]): Seq[Node] = {
+ <tr> {data.map(d => <td>{d}</td>)} </tr>
+ }
+ UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+ }
+}
+
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
new file mode 100644
index 0000000000..343031f10c
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import org.apache.spark.sql.hive.thriftserver.{HiveThriftServer2, SparkSQLEnv}
+import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab._
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+import org.apache.spark.{SparkContext, Logging, SparkException}
+
+/**
+ * Spark Web UI tab that shows statistics of a streaming job.
+ * This assumes the given SparkContext has enabled its SparkUI.
+ */
+private[thriftserver] class ThriftServerTab(sparkContext: SparkContext)
+ extends SparkUITab(getSparkUI(sparkContext), "ThriftServer") with Logging {
+
+ val parent = getSparkUI(sparkContext)
+ val listener = HiveThriftServer2.listener
+
+ attachPage(new ThriftServerPage(this))
+ attachPage(new ThriftServerSessionPage(this))
+ parent.attachTab(this)
+
+ def detach() {
+ getSparkUI(sparkContext).detachTab(this)
+ }
+}
+
+private[thriftserver] object ThriftServerTab {
+ def getSparkUI(sparkContext: SparkContext): SparkUI = {
+ sparkContext.ui.getOrElse {
+ throw new SparkException("Parent SparkUI to attach this tab to not found!")
+ }
+ }
+}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 4cf95e7bdf..1fadea97fd 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -409,24 +409,24 @@ abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll wit
private val CLASS_NAME = HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")
private val LOG_FILE_MARK = s"starting $CLASS_NAME, logging to "
- private val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
- private val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
+ protected val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
+ protected val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
private var listeningPort: Int = _
protected def serverPort: Int = listeningPort
protected def user = System.getProperty("user.name")
- private var warehousePath: File = _
- private var metastorePath: File = _
- private def metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
+ protected var warehousePath: File = _
+ protected var metastorePath: File = _
+ protected def metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
private val pidDir: File = Utils.createTempDir("thriftserver-pid")
private var logPath: File = _
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
- private def serverStartCommand(port: Int) = {
+ protected def serverStartCommand(port: Int) = {
val portConf = if (mode == ServerMode.binary) {
ConfVars.HIVE_SERVER2_THRIFT_PORT
} else {
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
new file mode 100644
index 0000000000..47541015a3
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+
+
+import scala.util.Random
+
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.scalatest.{Matchers, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.selenium.WebBrowser
+import org.scalatest.time.SpanSugar._
+
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.spark.sql.hive.HiveContext
+
+
+class UISeleniumSuite
+ extends HiveThriftJdbcTest
+ with WebBrowser with Matchers with BeforeAndAfterAll {
+
+ implicit var webDriver: WebDriver = _
+ var server: HiveThriftServer2 = _
+ var hc: HiveContext = _
+ val uiPort = 20000 + Random.nextInt(10000)
+ override def mode: ServerMode.Value = ServerMode.binary
+
+ override def beforeAll(): Unit = {
+ webDriver = new HtmlUnitDriver
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ super.afterAll()
+ }
+
+ override protected def serverStartCommand(port: Int) = {
+ val portConf = if (mode == ServerMode.binary) {
+ ConfVars.HIVE_SERVER2_THRIFT_PORT
+ } else {
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT
+ }
+
+ s"""$startScript
+ | --master local
+ | --hiveconf hive.root.logger=INFO,console
+ | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
+ | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
+ | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
+ | --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
+ | --hiveconf $portConf=$port
+ | --driver-class-path ${sys.props("java.class.path")}
+ | --conf spark.ui.enabled=true
+ | --conf spark.ui.port=$uiPort
+ """.stripMargin.split("\\s+").toSeq
+ }
+
+ test("thrift server ui test") {
+ withJdbcStatement(statement =>{
+ val baseURL = s"http://localhost:${uiPort}"
+
+ val queries = Seq(
+ "CREATE TABLE test_map(key INT, value STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
+
+ queries.foreach(statement.execute)
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (baseURL)
+ find(cssSelector("""ul li a[href*="ThriftServer"]""")) should not be(None)
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (baseURL + "/ThriftServer")
+ find(id("sessionstat")) should not be(None)
+ find(id("sqlstat")) should not be(None)
+
+ // check whether statements exists
+ queries.foreach { line =>
+ findAll(cssSelector("""ul table tbody tr td""")).map(_.text).toList should contain (line)
+ }
+ }
+ })
+ }
+}
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 95a6e86d05..b3a79ba1c7 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.sql.{Date, Timestamp}
import java.util.concurrent.Executors
-import java.util.{ArrayList => JArrayList, Map => JMap}
+import java.util.{ArrayList => JArrayList, Map => JMap, UUID}
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
@@ -190,9 +190,12 @@ private[hive] class SparkExecuteStatementOperation(
}
def run(): Unit = {
+ val statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
- hiveContext.sparkContext.setJobDescription(statement)
+ HiveThriftServer2.listener.onStatementStart(
+ statementId, parentSession.getSessionHandle.getSessionId.toString, statement, statementId)
+ hiveContext.sparkContext.setJobGroup(statementId, statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
@@ -205,6 +208,7 @@ private[hive] class SparkExecuteStatementOperation(
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
+ HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
@@ -221,10 +225,13 @@ private[hive] class SparkExecuteStatementOperation(
// HiveServer will silently swallow them.
case e: Throwable =>
setState(OperationState.ERROR)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, e.getMessage, e.getStackTraceString)
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
+ HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
@@ -255,11 +262,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
hiveContext.openSession()
-
- super.openSession(username, passwd, sessionConf, withImpersonation, delegationToken)
+ val sessionHandle = super.openSession(
+ username, passwd, sessionConf, withImpersonation, delegationToken)
+ HiveThriftServer2.listener.onSessionCreated("UNKNOWN", sessionHandle.getSessionId.toString)
+ sessionHandle
}
override def closeSession(sessionHandle: SessionHandle) {
+ HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 178eb1af7c..b9d4f1c58c 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.sql.{Date, Timestamp}
import java.util.concurrent.Executors
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID}
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
@@ -36,7 +36,7 @@ import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.{SessionManager, HiveSession}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
@@ -161,9 +161,16 @@ private[hive] class SparkExecuteStatementOperation(
}
def run(): Unit = {
+ val statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
- hiveContext.sparkContext.setJobDescription(statement)
+ HiveThriftServer2.listener.onStatementStart(
+ statementId,
+ parentSession.getSessionHandle.getSessionId.toString,
+ statement,
+ statementId,
+ parentSession.getUsername)
+ hiveContext.sparkContext.setJobGroup(statementId, statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
@@ -176,6 +183,7 @@ private[hive] class SparkExecuteStatementOperation(
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
+ HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
@@ -192,10 +200,13 @@ private[hive] class SparkExecuteStatementOperation(
// HiveServer will silently swallow them.
case e: Throwable =>
setState(OperationState.ERROR)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, e.getMessage, e.getStackTraceString)
logError("Error executing query:", e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
+ HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
@@ -227,11 +238,16 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
hiveContext.openSession()
-
- super.openSession(protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
+ val sessionHandle = super.openSession(
+ protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
+ val session = super.getSession(sessionHandle)
+ HiveThriftServer2.listener.onSessionCreated(
+ session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
+ sessionHandle
}
override def closeSession(sessionHandle: SessionHandle) {
+ HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle