diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-09-28 23:28:16 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-09-28 23:28:16 -0700 |
commit | 1d44644f4f13e4c7416492a95a935f74b87f7578 (patch) | |
tree | e67eb6c9851b0f94d3c2aeb60faef1cde2862047 /core | |
parent | 815d6bd69a0c1ba0e94fc0785f5c3619b37f19c5 (diff) | |
download | spark-1d44644f4f13e4c7416492a95a935f74b87f7578.tar.gz spark-1d44644f4f13e4c7416492a95a935f74b87f7578.tar.bz2 spark-1d44644f4f13e4c7416492a95a935f74b87f7578.zip |
Logging tweaks
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/CacheTracker.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/spark/ShuffledRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 9 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 14 |
5 files changed, 23 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 356637825e..9f88f93269 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -139,7 +139,6 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl logInfo("Registering RDD ID " + rddId + " with cache") registeredRddIds += rddId communicate(RegisterRDD(rddId, numPartitions)) - logInfo(RegisterRDD(rddId, numPartitions) + " successful") } } } diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 7c11925f86..11d5c0ede8 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -70,7 +70,7 @@ class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V]( // By separating this from RepartitionShuffledRDD, we avoided a // buf.iterator.toArray call, thus avoiding building up the buffer twice. val buf = new ArrayBuffer[(K, V)] - def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) } + def addTupleToBuffer(k: K, v: V) { buf += ((k, v)) } SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) if (ascending) { buf.sortWith((x, y) => x._1 < y._1).iterator diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0643c367f0..79a9e8e34e 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -409,10 +409,11 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { - logInfo("Starting job...") + val callSite = Utils.getSparkCallSite + logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, allowLocal) - logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s") + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal) + logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") result } @@ -445,10 +446,11 @@ class SparkContext( evaluator: ApproximateEvaluator[U, R], timeout: Long ): PartialResult[R] = { - logInfo("Starting job...") + val callSite = Utils.getSparkCallSite + logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runApproximateJob(rdd, func, evaluator, timeout) - logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s") + val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout) + logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") result } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index ff28d52484..a480fe046d 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -369,8 +369,13 @@ private object Utils extends Logging { for (el <- trace) { if (!finished) { - if (el.getClassName.contains("spark") && !el.getClassName.startsWith("spark.examples")) { - lastSparkMethod = el.getMethodName + if (el.getClassName.startsWith("spark.") && !el.getClassName.startsWith("spark.examples.")) { + lastSparkMethod = if (el.getMethodName == "<init>") { + // Spark method is a constructor; get its class name + el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) + } else { + el.getMethodName + } } else { firstUserLine = el.getLineNumber diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 0d9922766a..70931407ce 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -188,23 +188,22 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with missing.toList } - def runJob[T, U]( + def runJob[T, U: ClassManifest]( finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], + callSite: String, allowLocal: Boolean) - (implicit m: ClassManifest[U]): Array[U] = + : Array[U] = { if (partitions.size == 0) { return new Array[U](0) } val waiter = new JobWaiter(partitions.size) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] - val callSite = Utils.getSparkCallSite eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter)) waiter.getResult() match { case JobSucceeded(results: Seq[_]) => - logInfo("Finished " + callSite) return results.asInstanceOf[Seq[U]].toArray case JobFailed(exception: Exception) => logInfo("Failed to run " + callSite) @@ -216,13 +215,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, evaluator: ApproximateEvaluator[U, R], - timeout: Long - ): PartialResult[R] = + callSite: String, + timeout: Long) + : PartialResult[R] = { val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.splits.size).toArray - eventQueue.put(JobSubmitted(rdd, func2, partitions, false, Utils.getSparkCallSite, listener)) + eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener)) return listener.getResult() // Will throw an exception if the job fails } |