aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-29 21:53:21 -0700
committerXiangrui Meng <meng@databricks.com>2014-09-29 21:53:21 -0700
commit210404a56197ad347f1e621ed53ef01327fba2bd (patch)
tree24d41eb1f15cce35d99e510a58139ba07775dbe7 /core
parentdc30e4504abcda1774f5f09a08bba73d29a2898b (diff)
downloadspark-210404a56197ad347f1e621ed53ef01327fba2bd.tar.gz
spark-210404a56197ad347f1e621ed53ef01327fba2bd.tar.bz2
spark-210404a56197ad347f1e621ed53ef01327fba2bd.zip
Minor cleanup of code.
Author: Reynold Xin <rxin@apache.org> Closes #2581 from rxin/minor-cleanup and squashes the following commits: 736a91b [Reynold Xin] Minor cleanup of code.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala60
3 files changed, 31 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index ceb434feb6..54904bffdf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -20,15 +20,12 @@ package org.apache.spark.scheduler
import java.io.{File, FileNotFoundException, IOException, PrintWriter}
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
-import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.executor.TaskMetrics
/**
* :: DeveloperApi ::
@@ -62,24 +59,16 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]
createLogDir()
- // The following 5 functions are used only in testing.
- private[scheduler] def getLogDir = logDir
- private[scheduler] def getJobIdToPrintWriter = jobIdToPrintWriter
- private[scheduler] def getStageIdToJobId = stageIdToJobId
- private[scheduler] def getJobIdToStageIds = jobIdToStageIds
- private[scheduler] def getEventQueue = eventQueue
-
/** Create a folder for log files, the folder's name is the creation time of jobLogger */
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
if (dir.exists()) {
return
}
- if (dir.mkdirs() == false) {
+ if (!dir.mkdirs()) {
// JobLogger should throw a exception rather than continue to construct this object.
throw new IOException("create log directory error:" + logDir + "/" + logDirName + "/")
}
@@ -261,7 +250,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
protected def recordJobProperties(jobId: Int, properties: Properties) {
if (properties != null) {
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
- jobLogInfo(jobId, description, false)
+ jobLogInfo(jobId, description, withTime = false)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6a48f673c4..5b2e7d3a7e 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -25,7 +25,6 @@ import scala.collection.Map
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
-import org.json4s.jackson.JsonMethods._
import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 10d440828e..dbe0cfa2b8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
-import org.apache.log4j.PropertyConfigurator
-
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
@@ -37,12 +35,12 @@ import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.PropertyConfigurator
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
@@ -86,7 +84,7 @@ private[spark] object Utils extends Logging {
ois.readObject.asInstanceOf[T]
}
- /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */
+ /** Deserialize a Long value (used for [[org.apache.spark.api.python.PythonPartitioner]]) */
def deserializeLongValue(bytes: Array[Byte]) : Long = {
// Note: we assume that we are given a Long value encoded in network (big-endian) byte order
var result = bytes(7) & 0xFFL
@@ -153,7 +151,7 @@ private[spark] object Utils extends Logging {
def classForName(className: String) = Class.forName(className, true, getContextOrSparkClassLoader)
/**
- * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
+ * Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]]
*/
def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
if (bb.hasArray) {
@@ -333,7 +331,7 @@ private[spark] object Utils extends Logging {
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
- val fileOverwrite = conf.getBoolean("spark.files.overwrite", false)
+ val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
uri.getScheme match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + tempFile)
@@ -355,7 +353,7 @@ private[spark] object Utils extends Logging {
uc.connect()
val in = uc.getInputStream()
val out = new FileOutputStream(tempFile)
- Utils.copyStream(in, out, true)
+ Utils.copyStream(in, out, closeStreams = true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
if (fileOverwrite) {
targetFile.delete()
@@ -402,7 +400,7 @@ private[spark] object Utils extends Logging {
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
- Utils.copyStream(in, out, true)
+ Utils.copyStream(in, out, closeStreams = true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
if (fileOverwrite) {
targetFile.delete()
@@ -666,7 +664,7 @@ private[spark] object Utils extends Logging {
*/
def deleteRecursively(file: File) {
if (file != null) {
- if ((file.isDirectory) && !isSymlink(file)) {
+ if (file.isDirectory() && !isSymlink(file)) {
for (child <- listFilesSafely(file)) {
deleteRecursively(child)
}
@@ -701,11 +699,7 @@ private[spark] object Utils extends Logging {
new File(file.getParentFile().getCanonicalFile(), file.getName())
}
- if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
- return false
- } else {
- return true
- }
+ !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())
}
/**
@@ -804,7 +798,7 @@ private[spark] object Utils extends Logging {
.start()
new Thread("read stdout for " + command(0)) {
override def run() {
- for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+ for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
System.err.println(line)
}
}
@@ -818,8 +812,10 @@ private[spark] object Utils extends Logging {
/**
* Execute a command and get its output, throwing an exception if it yields a code other than 0.
*/
- def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
- extraEnvironment: Map[String, String] = Map.empty): String = {
+ def executeAndGetOutput(
+ command: Seq[String],
+ workingDir: File = new File("."),
+ extraEnvironment: Map[String, String] = Map.empty): String = {
val builder = new ProcessBuilder(command: _*)
.directory(workingDir)
val environment = builder.environment()
@@ -829,7 +825,7 @@ private[spark] object Utils extends Logging {
val process = builder.start()
new Thread("read stderr for " + command(0)) {
override def run() {
- for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
+ for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
System.err.println(line)
}
}
@@ -837,7 +833,7 @@ private[spark] object Utils extends Logging {
val output = new StringBuffer
val stdoutThread = new Thread("read stdout for " + command(0)) {
override def run() {
- for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+ for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
output.append(line)
}
}
@@ -846,8 +842,8 @@ private[spark] object Utils extends Logging {
val exitCode = process.waitFor()
stdoutThread.join() // Wait for it to finish reading output
if (exitCode != 0) {
- logError(s"Process $command exited with code $exitCode: ${output}")
- throw new SparkException("Process " + command + " exited with code " + exitCode)
+ logError(s"Process $command exited with code $exitCode: $output")
+ throw new SparkException(s"Process $command exited with code $exitCode")
}
output.toString
}
@@ -860,6 +856,7 @@ private[spark] object Utils extends Logging {
try {
block
} catch {
+ case e: ControlThrowable => throw e
case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t)
}
}
@@ -884,13 +881,12 @@ private[spark] object Utils extends Logging {
* @param skipClass Function that is used to exclude non-user-code classes.
*/
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
- val trace = Thread.currentThread.getStackTrace()
- .filterNot { ste:StackTraceElement =>
- // When running under some profilers, the current stack trace might contain some bogus
- // frames. This is intended to ensure that we don't crash in these situations by
- // ignoring any frames that we can't examine.
- (ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
- }
+ val trace = Thread.currentThread.getStackTrace().filterNot { ste: StackTraceElement =>
+ // When running under some profilers, the current stack trace might contain some bogus
+ // frames. This is intended to ensure that we don't crash in these situations by
+ // ignoring any frames that we can't examine.
+ ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace")
+ }
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
@@ -924,7 +920,7 @@ private[spark] object Utils extends Logging {
}
val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
CallSite(
- shortForm = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
+ shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine",
longForm = callStack.take(callStackDepth).mkString("\n"))
}
@@ -1027,7 +1023,7 @@ private[spark] object Utils extends Logging {
false
}
- def isSpace(c: Char): Boolean = {
+ private def isSpace(c: Char): Boolean = {
" \t\r\n".indexOf(c) != -1
}
@@ -1179,7 +1175,7 @@ private[spark] object Utils extends Logging {
}
import scala.sys.process._
(linkCmd + src.getAbsolutePath() + " " + dst.getPath() + cmdSuffix) lines_!
- ProcessLogger(line => (logInfo(line)))
+ ProcessLogger(line => logInfo(line))
}
@@ -1260,7 +1256,7 @@ private[spark] object Utils extends Logging {
val startTime = System.currentTimeMillis
while (!terminated) {
try {
- process.exitValue
+ process.exitValue()
terminated = true
} catch {
case e: IllegalThreadStateException =>