aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala106
-rw-r--r--scalastyle-config.xml6
2 files changed, 70 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c11eb3ffa4..6593aab33f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -107,20 +107,20 @@ private[spark] object JsonProtocol {
def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
val properties = propertiesToJson(stageSubmitted.properties)
- ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
("Stage Info" -> stageInfo) ~
("Properties" -> properties)
}
def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
- ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
("Stage Info" -> stageInfo)
}
def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
val taskInfo = taskStart.taskInfo
- ("Event" -> Utils.getFormattedClassName(taskStart)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
("Stage ID" -> taskStart.stageId) ~
("Stage Attempt ID" -> taskStart.stageAttemptId) ~
("Task Info" -> taskInfoToJson(taskInfo))
@@ -128,7 +128,7 @@ private[spark] object JsonProtocol {
def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
val taskInfo = taskGettingResult.taskInfo
- ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
("Task Info" -> taskInfoToJson(taskInfo))
}
@@ -137,7 +137,7 @@ private[spark] object JsonProtocol {
val taskInfo = taskEnd.taskInfo
val taskMetrics = taskEnd.taskMetrics
val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
- ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
("Stage ID" -> taskEnd.stageId) ~
("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
("Task Type" -> taskEnd.taskType) ~
@@ -148,7 +148,7 @@ private[spark] object JsonProtocol {
def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
val properties = propertiesToJson(jobStart.properties)
- ("Event" -> Utils.getFormattedClassName(jobStart)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
("Job ID" -> jobStart.jobId) ~
("Submission Time" -> jobStart.time) ~
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0
@@ -158,7 +158,7 @@ private[spark] object JsonProtocol {
def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
val jobResult = jobResultToJson(jobEnd.jobResult)
- ("Event" -> Utils.getFormattedClassName(jobEnd)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~
("Job ID" -> jobEnd.jobId) ~
("Completion Time" -> jobEnd.time) ~
("Job Result" -> jobResult)
@@ -170,7 +170,7 @@ private[spark] object JsonProtocol {
val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
- ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~
("JVM Information" -> jvmInformation) ~
("Spark Properties" -> sparkProperties) ~
("System Properties" -> systemProperties) ~
@@ -179,7 +179,7 @@ private[spark] object JsonProtocol {
def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
- ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
("Block Manager ID" -> blockManagerId) ~
("Maximum Memory" -> blockManagerAdded.maxMem) ~
("Timestamp" -> blockManagerAdded.time)
@@ -187,18 +187,18 @@ private[spark] object JsonProtocol {
def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
- ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerRemoved) ~
("Block Manager ID" -> blockManagerId) ~
("Timestamp" -> blockManagerRemoved.time)
}
def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
- ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.unpersistRDD) ~
("RDD ID" -> unpersistRDD.rddId)
}
def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
- ("Event" -> Utils.getFormattedClassName(applicationStart)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) ~
("App Name" -> applicationStart.appName) ~
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
@@ -208,33 +208,33 @@ private[spark] object JsonProtocol {
}
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
- ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd) ~
("Timestamp" -> applicationEnd.time)
}
def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
- ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
("Timestamp" -> executorAdded.time) ~
("Executor ID" -> executorAdded.executorId) ~
("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
}
def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
- ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorRemoved) ~
("Timestamp" -> executorRemoved.time) ~
("Executor ID" -> executorRemoved.executorId) ~
("Removed Reason" -> executorRemoved.reason)
}
def logStartToJson(logStart: SparkListenerLogStart): JValue = {
- ("Event" -> Utils.getFormattedClassName(logStart)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart) ~
("Spark Version" -> SPARK_VERSION)
}
def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
val execId = metricsUpdate.execId
val accumUpdates = metricsUpdate.accumUpdates
- ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
("Executor ID" -> execId) ~
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Task ID" -> taskId) ~
@@ -485,7 +485,7 @@ private[spark] object JsonProtocol {
* JSON deserialization methods for SparkListenerEvents |
* ---------------------------------------------------- */
- def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+ private object SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES {
val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted)
val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted)
val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart)
@@ -503,6 +503,10 @@ private[spark] object JsonProtocol {
val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
+ }
+
+ def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+ import SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES._
(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -540,7 +544,8 @@ private[spark] object JsonProtocol {
def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
val stageId = (json \ "Stage ID").extract[Int]
- val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+ val stageAttemptId =
+ Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
val taskInfo = taskInfoFromJson(json \ "Task Info")
SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
}
@@ -552,7 +557,8 @@ private[spark] object JsonProtocol {
def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
val stageId = (json \ "Stage ID").extract[Int]
- val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+ val stageAttemptId =
+ Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
val taskType = (json \ "Task Type").extract[String]
val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
val taskInfo = taskInfoFromJson(json \ "Task Info")
@@ -662,20 +668,22 @@ private[spark] object JsonProtocol {
def stageInfoFromJson(json: JValue): StageInfo = {
val stageId = (json \ "Stage ID").extract[Int]
- val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+ val attemptId = Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
val parentIds = Utils.jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
- val details = (json \ "Details").extractOpt[String].getOrElse("")
+ val details = Utils.jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
- val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match {
- case Some(values) => values.map(accumulableInfoFromJson)
- case None => Seq[AccumulableInfo]()
+ val accumulatedValues = {
+ Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
+ case Some(values) => values.map(accumulableInfoFromJson)
+ case None => Seq[AccumulableInfo]()
+ }
}
val stageInfo = new StageInfo(
@@ -692,17 +700,17 @@ private[spark] object JsonProtocol {
def taskInfoFromJson(json: JValue): TaskInfo = {
val taskId = (json \ "Task ID").extract[Long]
val index = (json \ "Index").extract[Int]
- val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
+ val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
- val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
+ val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean])
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
- val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false)
- val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
+ val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
+ val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
}
@@ -719,12 +727,13 @@ private[spark] object JsonProtocol {
def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
val id = (json \ "ID").extract[Long]
- val name = (json \ "Name").extractOpt[String]
+ val name = Utils.jsonOption(json \ "Name").map(_.extract[String])
val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
- val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
- val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
- val metadata = (json \ "Metadata").extractOpt[String]
+ val internal = Utils.jsonOption(json \ "Internal").exists(_.extract[Boolean])
+ val countFailedValues =
+ Utils.jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
+ val metadata = Utils.jsonOption(json \ "Metadata").map(_.extract[String])
new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
}
@@ -782,9 +791,11 @@ private[spark] object JsonProtocol {
readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
- readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L))
+ readMetrics.incLocalBytesRead(
+ Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
- readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L))
+ readMetrics.incRecordsRead(
+ Utils.jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
metrics.mergeShuffleReadMetrics()
}
@@ -793,8 +804,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
val writeMetrics = metrics.shuffleWriteMetrics
writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
- writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
- .extractOpt[Long].getOrElse(0L))
+ writeMetrics.incRecordsWritten(
+ Utils.jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
}
@@ -802,14 +813,16 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
val outputMetrics = metrics.outputMetrics
outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
- outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L))
+ outputMetrics.setRecordsWritten(
+ Utils.jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
}
// Input metrics
Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
val inputMetrics = metrics.inputMetrics
inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
- inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
+ inputMetrics.incRecordsRead(
+ Utils.jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
}
// Updated blocks
@@ -824,7 +837,7 @@ private[spark] object JsonProtocol {
metrics
}
- def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+ private object TASK_END_REASON_FORMATTED_CLASS_NAMES {
val success = Utils.getFormattedClassName(Success)
val resubmitted = Utils.getFormattedClassName(Resubmitted)
val fetchFailed = Utils.getFormattedClassName(FetchFailed)
@@ -834,6 +847,10 @@ private[spark] object JsonProtocol {
val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
val unknownReason = Utils.getFormattedClassName(UnknownReason)
+ }
+
+ def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+ import TASK_END_REASON_FORMATTED_CLASS_NAMES._
(json \ "Reason").extract[String] match {
case `success` => Success
@@ -850,7 +867,8 @@ private[spark] object JsonProtocol {
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
- val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull
+ val fullStackTrace =
+ Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
// Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
.map(_.extract[List[JValue]].map(accumulableInfoFromJson))
@@ -891,9 +909,13 @@ private[spark] object JsonProtocol {
BlockManagerId(executorId, host, port)
}
- def jobResultFromJson(json: JValue): JobResult = {
+ private object JOB_RESULT_FORMATTED_CLASS_NAMES {
val jobSucceeded = Utils.getFormattedClassName(JobSucceeded)
val jobFailed = Utils.getFormattedClassName(JobFailed)
+ }
+
+ def jobResultFromJson(json: JValue): JobResult = {
+ import JOB_RESULT_FORMATTED_CLASS_NAMES._
(json \ "Result").extract[String] match {
case `jobSucceeded` => JobSucceeded
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 81d57d723a..48333851ef 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -217,6 +217,12 @@ This file is divided into 3 sections:
of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
</check>
+ <check customId="extractopt" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+ <parameters><parameter name="regex">extractOpt</parameter></parameters>
+ <customMessage>Use Utils.jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter
+ is slower. </customMessage>
+ </check>
+
<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
<parameters>
<parameter name="groups">java,scala,3rdParty,spark</parameter>