From 2c0aa22e2e26ae35b7d4caa529bc6520e362cc3c Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 21 Mar 2014 16:07:22 -0700 Subject: SPARK-1279: Fix improper use of SimpleDateFormat `SimpleDateFormat` is not thread-safe. Some places use the same SimpleDateFormat object without safeguard in the multiple threads. It will cause that the Web UI displays improper date. This PR creates a new `SimpleDateFormat` every time when it's necessary. Another solution is using `ThreadLocal` to store a `SimpleDateFormat` in each thread. If this PR impacts the performance, I can change to the latter one. Author: zsxwing Closes #179 from zsxwing/SPARK-1278 and squashes the following commits: 21fabd3 [zsxwing] SPARK-1278: Fix improper use of SimpleDateFormat --- .../main/scala/org/apache/spark/deploy/WebUI.scala | 47 -------------------- .../org/apache/spark/deploy/master/Master.scala | 6 +-- .../apache/spark/deploy/master/ui/IndexPage.scala | 8 ++-- .../org/apache/spark/deploy/worker/Worker.scala | 4 +- .../org/apache/spark/scheduler/JobLogger.scala | 6 ++- .../src/main/scala/org/apache/spark/ui/WebUI.scala | 50 ++++++++++++++++++++++ .../org/apache/spark/ui/jobs/JobProgressUI.scala | 2 - .../scala/org/apache/spark/ui/jobs/StagePage.scala | 5 +-- .../org/apache/spark/ui/jobs/StageTable.scala | 5 +-- .../scala/org/apache/spark/util/FileLogger.scala | 7 ++- 10 files changed, 72 insertions(+), 68 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/WebUI.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/WebUI.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala deleted file mode 100644 index ae258b58b9..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy - -import java.text.SimpleDateFormat -import java.util.Date - -/** - * Utilities used throughout the web UI. - */ -private[spark] object DeployWebUI { - val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - - def formatDate(date: Date): String = DATE_FORMAT.format(date) - - def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp)) - - def formatDuration(milliseconds: Long): String = { - val seconds = milliseconds.toDouble / 1000 - if (seconds < 60) { - return "%.0f s".format(seconds) - } - val minutes = seconds / 60 - if (minutes < 10) { - return "%.1f min".format(minutes) - } else if (minutes < 60) { - return "%.0f min".format(minutes) - } - val hours = minutes / 60 - return "%.1f h".format(hours) - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9ed49e01be..95bd62e88d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -51,7 +51,7 @@ private[spark] class Master( val conf = new SparkConf - val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs + def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000 val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) @@ -682,7 +682,7 @@ private[spark] class Master( /** Generate a new app ID given a app's submission date */ def newApplicationId(submitDate: Date): String = { - val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber) + val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber) nextAppNumber += 1 appId } @@ -706,7 +706,7 @@ private[spark] class Master( } def newDriverId(submitDate: Date): String = { - val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber) + val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber) nextDriverNumber += 1 appId } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 7ec71eb80b..8c1d6c7cce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -25,10 +25,10 @@ import scala.xml.Node import akka.pattern.ask import org.json4s.JValue -import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} +import org.apache.spark.deploy.{JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUI, UIUtils} import org.apache.spark.util.Utils private[spark] class IndexPage(parent: MasterWebUI) { @@ -169,10 +169,10 @@ private[spark] class IndexPage(parent: MasterWebUI) { {Utils.megabytesToString(app.desc.memoryPerSlave)} - {DeployWebUI.formatDate(app.submitDate)} + {WebUI.formatDate(app.submitDate)} {app.desc.user} {app.state.toString} - {DeployWebUI.formatDuration(app.duration)} + {WebUI.formatDuration(app.duration)} } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 5e0fc31fff..8a71ddda4c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -56,7 +56,7 @@ private[spark] class Worker( Utils.checkHost(host, "Expected hostname") assert (port > 0) - val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs + def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs // Send a heartbeat every (heartbeat timeout) / 4 milliseconds val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4 @@ -319,7 +319,7 @@ private[spark] class Worker( } def generateWorkerId(): String = { - "worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port) + "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port) } override def postStop() { diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index b3a67d7e17..5cecf9416b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -55,7 +55,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener private val jobIdToPrintWriter = new HashMap[Int, PrintWriter] private val stageIdToJobId = new HashMap[Int, Int] private val jobIdToStageIds = new HashMap[Int, Seq[Int]] - private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent] createLogDir() @@ -128,7 +130,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener var writeInfo = info if (withTime) { val date = new Date(System.currentTimeMillis()) - writeInfo = DATE_FORMAT.format(date) + ": " + info + writeInfo = dateFormat.get.format(date) + ": " + info } jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo)) } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala new file mode 100644 index 0000000000..a7b872f344 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import java.text.SimpleDateFormat +import java.util.Date + +/** + * Utilities used throughout the web UI. + */ +private[spark] object WebUI { + // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } + + def formatDate(date: Date): String = dateFormat.get.format(date) + + def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) + + def formatDuration(milliseconds: Long): String = { + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return "%.0f s".format(seconds) + } + val minutes = seconds / 60 + if (minutes < 10) { + return "%.1f min".format(minutes) + } else if (minutes < 60) { + return "%.0f min".format(minutes) + } + val hours = minutes / 60 + return "%.1f h".format(hours) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index ee4e9c69c1..b2c67381cc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -17,7 +17,6 @@ package org.apache.spark.ui.jobs -import java.text.SimpleDateFormat import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.servlet.ServletContextHandler @@ -32,7 +31,6 @@ import org.apache.spark.util.Utils private[ui] class JobProgressUI(parent: SparkUI) { val appName = parent.appName val basePath = parent.basePath - val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") val live = parent.live val sc = parent.sc diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index da7f202330..0c55f2ee7e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,14 +23,13 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUI, UIUtils} import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: JobProgressUI) { private val appName = parent.appName private val basePath = parent.basePath - private val dateFmt = parent.dateFmt private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { @@ -253,7 +252,7 @@ private[ui] class StagePage(parent: JobProgressUI) { {info.status} {info.taskLocality} {info.host} - {dateFmt.format(new Date(info.launchTime))} + {WebUI.formatDate(new Date(info.launchTime))} {formatDuration} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 68fef5234c..5bf1c95cd5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -23,13 +23,12 @@ import scala.collection.mutable.HashMap import scala.xml.Node import org.apache.spark.scheduler.{StageInfo, TaskInfo} -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUI, UIUtils} import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { private val basePath = parent.basePath - private val dateFmt = parent.dateFmt private lazy val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler @@ -82,7 +81,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { val description = listener.stageIdToDescription.get(s.stageId) .map(d =>
{d}
{nameLink}
).getOrElse(nameLink) val submissionTime = s.submissionTime match { - case Some(t) => dateFmt.format(new Date(t)) + case Some(t) => WebUI.formatDate(new Date(t)) case None => "Unknown" } val finishTime = s.completionTime.getOrElse(System.currentTimeMillis) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index f07962096a..a0c07e32fd 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -44,7 +44,10 @@ class FileLogger( overwrite: Boolean = true) extends Logging { - private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } + private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) private var fileIndex = 0 @@ -111,7 +114,7 @@ class FileLogger( def log(msg: String, withTime: Boolean = false) { val writeInfo = if (!withTime) msg else { val date = new Date(System.currentTimeMillis()) - DATE_FORMAT.format(date) + ": " + msg + dateFormat.get.format(date) + ": " + msg } writer.foreach(_.print(writeInfo)) } -- cgit v1.2.3