diff options
author | Reynold Xin <rxin@apache.org> | 2014-09-29 21:53:21 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-09-29 21:53:21 -0700 |
commit | 210404a56197ad347f1e621ed53ef01327fba2bd (patch) | |
tree | 24d41eb1f15cce35d99e510a58139ba07775dbe7 /core | |
parent | dc30e4504abcda1774f5f09a08bba73d29a2898b (diff) | |
download | spark-210404a56197ad347f1e621ed53ef01327fba2bd.tar.gz spark-210404a56197ad347f1e621ed53ef01327fba2bd.tar.bz2 spark-210404a56197ad347f1e621ed53ef01327fba2bd.zip |
Minor cleanup of code.
Author: Reynold Xin <rxin@apache.org>
Closes #2581 from rxin/minor-cleanup and squashes the following commits:
736a91b [Reynold Xin] Minor cleanup of code.
Diffstat (limited to 'core')
3 files changed, 31 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index ceb434feb6..54904bffdf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -20,15 +20,12 @@ package org.apache.spark.scheduler import java.io.{File, FileNotFoundException, IOException, PrintWriter} import java.text.SimpleDateFormat import java.util.{Date, Properties} -import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.HashMap import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.{DataReadMethod, TaskMetrics} -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel +import org.apache.spark.executor.TaskMetrics /** * :: DeveloperApi :: @@ -62,24 +59,16 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener private val dateFormat = new ThreadLocal[SimpleDateFormat]() { override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") } - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent] createLogDir() - // The following 5 functions are used only in testing. - private[scheduler] def getLogDir = logDir - private[scheduler] def getJobIdToPrintWriter = jobIdToPrintWriter - private[scheduler] def getStageIdToJobId = stageIdToJobId - private[scheduler] def getJobIdToStageIds = jobIdToStageIds - private[scheduler] def getEventQueue = eventQueue - /** Create a folder for log files, the folder's name is the creation time of jobLogger */ protected def createLogDir() { val dir = new File(logDir + "/" + logDirName + "/") if (dir.exists()) { return } - if (dir.mkdirs() == false) { + if (!dir.mkdirs()) { // JobLogger should throw a exception rather than continue to construct this object. throw new IOException("create log directory error:" + logDir + "/" + logDirName + "/") } @@ -261,7 +250,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener protected def recordJobProperties(jobId: Int, properties: Properties) { if (properties != null) { val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "") - jobLogInfo(jobId, description, false) + jobLogInfo(jobId, description, withTime = false) } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 6a48f673c4..5b2e7d3a7e 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -25,7 +25,6 @@ import scala.collection.Map import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ -import org.json4s.jackson.JsonMethods._ import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics, diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 10d440828e..dbe0cfa2b8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -23,8 +23,6 @@ import java.nio.ByteBuffer import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} -import org.apache.log4j.PropertyConfigurator - import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer @@ -37,12 +35,12 @@ import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration +import org.apache.log4j.PropertyConfigurator import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.json4s._ import tachyon.client.{TachyonFile,TachyonFS} import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.ExecutorUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} @@ -86,7 +84,7 @@ private[spark] object Utils extends Logging { ois.readObject.asInstanceOf[T] } - /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */ + /** Deserialize a Long value (used for [[org.apache.spark.api.python.PythonPartitioner]]) */ def deserializeLongValue(bytes: Array[Byte]) : Long = { // Note: we assume that we are given a Long value encoded in network (big-endian) byte order var result = bytes(7) & 0xFFL @@ -153,7 +151,7 @@ private[spark] object Utils extends Logging { def classForName(className: String) = Class.forName(className, true, getContextOrSparkClassLoader) /** - * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}. + * Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]] */ def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = { if (bb.hasArray) { @@ -333,7 +331,7 @@ private[spark] object Utils extends Logging { val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) val targetFile = new File(targetDir, filename) val uri = new URI(url) - val fileOverwrite = conf.getBoolean("spark.files.overwrite", false) + val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false) uri.getScheme match { case "http" | "https" | "ftp" => logInfo("Fetching " + url + " to " + tempFile) @@ -355,7 +353,7 @@ private[spark] object Utils extends Logging { uc.connect() val in = uc.getInputStream() val out = new FileOutputStream(tempFile) - Utils.copyStream(in, out, true) + Utils.copyStream(in, out, closeStreams = true) if (targetFile.exists && !Files.equal(tempFile, targetFile)) { if (fileOverwrite) { targetFile.delete() @@ -402,7 +400,7 @@ private[spark] object Utils extends Logging { val fs = getHadoopFileSystem(uri, hadoopConf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) - Utils.copyStream(in, out, true) + Utils.copyStream(in, out, closeStreams = true) if (targetFile.exists && !Files.equal(tempFile, targetFile)) { if (fileOverwrite) { targetFile.delete() @@ -666,7 +664,7 @@ private[spark] object Utils extends Logging { */ def deleteRecursively(file: File) { if (file != null) { - if ((file.isDirectory) && !isSymlink(file)) { + if (file.isDirectory() && !isSymlink(file)) { for (child <- listFilesSafely(file)) { deleteRecursively(child) } @@ -701,11 +699,7 @@ private[spark] object Utils extends Logging { new File(file.getParentFile().getCanonicalFile(), file.getName()) } - if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) { - return false - } else { - return true - } + !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile()) } /** @@ -804,7 +798,7 @@ private[spark] object Utils extends Logging { .start() new Thread("read stdout for " + command(0)) { override def run() { - for (line <- Source.fromInputStream(process.getInputStream).getLines) { + for (line <- Source.fromInputStream(process.getInputStream).getLines()) { System.err.println(line) } } @@ -818,8 +812,10 @@ private[spark] object Utils extends Logging { /** * Execute a command and get its output, throwing an exception if it yields a code other than 0. */ - def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."), - extraEnvironment: Map[String, String] = Map.empty): String = { + def executeAndGetOutput( + command: Seq[String], + workingDir: File = new File("."), + extraEnvironment: Map[String, String] = Map.empty): String = { val builder = new ProcessBuilder(command: _*) .directory(workingDir) val environment = builder.environment() @@ -829,7 +825,7 @@ private[spark] object Utils extends Logging { val process = builder.start() new Thread("read stderr for " + command(0)) { override def run() { - for (line <- Source.fromInputStream(process.getErrorStream).getLines) { + for (line <- Source.fromInputStream(process.getErrorStream).getLines()) { System.err.println(line) } } @@ -837,7 +833,7 @@ private[spark] object Utils extends Logging { val output = new StringBuffer val stdoutThread = new Thread("read stdout for " + command(0)) { override def run() { - for (line <- Source.fromInputStream(process.getInputStream).getLines) { + for (line <- Source.fromInputStream(process.getInputStream).getLines()) { output.append(line) } } @@ -846,8 +842,8 @@ private[spark] object Utils extends Logging { val exitCode = process.waitFor() stdoutThread.join() // Wait for it to finish reading output if (exitCode != 0) { - logError(s"Process $command exited with code $exitCode: ${output}") - throw new SparkException("Process " + command + " exited with code " + exitCode) + logError(s"Process $command exited with code $exitCode: $output") + throw new SparkException(s"Process $command exited with code $exitCode") } output.toString } @@ -860,6 +856,7 @@ private[spark] object Utils extends Logging { try { block } catch { + case e: ControlThrowable => throw e case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t) } } @@ -884,13 +881,12 @@ private[spark] object Utils extends Logging { * @param skipClass Function that is used to exclude non-user-code classes. */ def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = { - val trace = Thread.currentThread.getStackTrace() - .filterNot { ste:StackTraceElement => - // When running under some profilers, the current stack trace might contain some bogus - // frames. This is intended to ensure that we don't crash in these situations by - // ignoring any frames that we can't examine. - (ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace")) - } + val trace = Thread.currentThread.getStackTrace().filterNot { ste: StackTraceElement => + // When running under some profilers, the current stack trace might contain some bogus + // frames. This is intended to ensure that we don't crash in these situations by + // ignoring any frames that we can't examine. + ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace") + } // Keep crawling up the stack trace until we find the first function not inside of the spark // package. We track the last (shallowest) contiguous Spark method. This might be an RDD @@ -924,7 +920,7 @@ private[spark] object Utils extends Logging { } val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt CallSite( - shortForm = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine), + shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine", longForm = callStack.take(callStackDepth).mkString("\n")) } @@ -1027,7 +1023,7 @@ private[spark] object Utils extends Logging { false } - def isSpace(c: Char): Boolean = { + private def isSpace(c: Char): Boolean = { " \t\r\n".indexOf(c) != -1 } @@ -1179,7 +1175,7 @@ private[spark] object Utils extends Logging { } import scala.sys.process._ (linkCmd + src.getAbsolutePath() + " " + dst.getPath() + cmdSuffix) lines_! - ProcessLogger(line => (logInfo(line))) + ProcessLogger(line => logInfo(line)) } @@ -1260,7 +1256,7 @@ private[spark] object Utils extends Logging { val startTime = System.currentTimeMillis while (!terminated) { try { - process.exitValue + process.exitValue() terminated = true } catch { case e: IllegalThreadStateException => |