aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCarson Wang <carson.wang@intel.com>2015-12-03 16:39:12 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-03 16:39:12 -0800
commitb6e9963ee4bf0ffb62c8e9829a551bcdc31e12e3 (patch)
tree756c1e9cee472fb3f8125f1a367ede5251bc41cd /core
parentf434f36d508eb4dcade70871611fc022ae0feb56 (diff)
downloadspark-b6e9963ee4bf0ffb62c8e9829a551bcdc31e12e3.tar.gz
spark-b6e9963ee4bf0ffb62c8e9829a551bcdc31e12e3.tar.bz2
spark-b6e9963ee4bf0ffb62c8e9829a551bcdc31e12e3.zip
[SPARK-11206] Support SQL UI on the history server (resubmit)
Resubmit #9297 and #9991 On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution. To support SQL UI on the history server: 1. I added an onOtherEvent method to the SparkListener trait and post all SQL related events to the same event bus. 2. Two SQL events SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd are defined in the sql module. 3. The new SQL events are written to event log using Jackson. 4. A new trait SparkHistoryListenerFactory is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using java.util.ServiceLoader. Author: Carson Wang <carson.wang@intel.com> Closes #10061 from carsonwang/SqlHistoryUI.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/JavaSparkListener.java3
-rw-r--r--core/src/main/java/org/apache/spark/SparkFirehoseListener.java4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala11
7 files changed, 57 insertions, 6 deletions
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
index fa9acf0a15..23bc9a2e81 100644
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -82,4 +82,7 @@ public class JavaSparkListener implements SparkListener {
@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
+ @Override
+ public void onOtherEvent(SparkListenerEvent event) { }
+
}
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 1214d05ba6..e6b24afd88 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -118,4 +118,8 @@ public class SparkFirehoseListener implements SparkListener {
onEvent(blockUpdated);
}
+ @Override
+ public void onOtherEvent(SparkListenerEvent event) {
+ onEvent(event);
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 000a021a52..eaa07acc51 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 896f174333..075a7f1317 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -22,15 +22,19 @@ import java.util.Properties
import scala.collection.Map
import scala.collection.mutable
-import org.apache.spark.{Logging, TaskEndReason}
+import com.fasterxml.jackson.annotation.JsonTypeInfo
+
+import org.apache.spark.{Logging, SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.ui.SparkUI
@DeveloperApi
-sealed trait SparkListenerEvent
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
+trait SparkListenerEvent
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
@@ -131,6 +135,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
/**
+ * Interface for creating history listeners defined in other modules like SQL, which are used to
+ * rebuild the history UI.
+ */
+private[spark] trait SparkHistoryListenerFactory {
+ /**
+ * Create listeners used to rebuild the history UI.
+ */
+ def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
+}
+
+/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
* interface which might change in different Spark releases. Java clients should extend
@@ -223,6 +238,11 @@ trait SparkListener {
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
+
+ /**
+ * Called when other events like SQL-specific events are posted.
+ */
+ def onOtherEvent(event: SparkListenerEvent) { }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 04afde33f5..95722a0714 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
+ case _ => listener.onOtherEvent(event)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 4608bce202..8da6884a38 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,10 +17,13 @@
package org.apache.spark.ui
-import java.util.Date
+import java.util.{Date, ServiceLoader}
+
+import scala.collection.JavaConverters._
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
UIRoot}
+import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
@@ -154,7 +157,16 @@ private[spark] object SparkUI {
appName: String,
basePath: String,
startTime: Long): SparkUI = {
- create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+ val sparkUI = create(
+ None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+
+ val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
+ Utils.getContextOrSparkClassLoader).asScala
+ listenerFactories.foreach { listenerFactory =>
+ val listeners = listenerFactory.createListeners(conf, sparkUI)
+ listeners.foreach(listenerBus.addListener)
+ }
+ sparkUI
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 2d2bd90eb3..cb0f1bf79f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,19 +19,21 @@ package org.apache.spark.util
import java.util.{Properties, UUID}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-
import scala.collection.JavaConverters._
import scala.collection.Map
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods._
import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
/**
@@ -54,6 +56,8 @@ private[spark] object JsonProtocol {
private implicit val format = DefaultFormats
+ private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
@@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
executorMetricsUpdateToJson(metricsUpdate)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
+ case _ => parse(mapper.writeValueAsString(event))
}
}
@@ -506,6 +511,8 @@ private[spark] object JsonProtocol {
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+ case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
+ .asInstanceOf[SparkListenerEvent]
}
}