aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 23:28:16 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 23:28:16 -0700
commit1d44644f4f13e4c7416492a95a935f74b87f7578 (patch)
treee67eb6c9851b0f94d3c2aeb60faef1cde2862047 /core/src/main
parent815d6bd69a0c1ba0e94fc0785f5c3619b37f19c5 (diff)
downloadspark-1d44644f4f13e4c7416492a95a935f74b87f7578.tar.gz
spark-1d44644f4f13e4c7416492a95a935f74b87f7578.tar.bz2
spark-1d44644f4f13e4c7416492a95a935f74b87f7578.zip
Logging tweaks
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala1
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala2
-rw-r--r--core/src/main/scala/spark/SparkContext.scala14
-rw-r--r--core/src/main/scala/spark/Utils.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala14
5 files changed, 23 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 356637825e..9f88f93269 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -139,7 +139,6 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
logInfo("Registering RDD ID " + rddId + " with cache")
registeredRddIds += rddId
communicate(RegisterRDD(rddId, numPartitions))
- logInfo(RegisterRDD(rddId, numPartitions) + " successful")
}
}
}
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 7c11925f86..11d5c0ede8 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -70,7 +70,7 @@ class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
// By separating this from RepartitionShuffledRDD, we avoided a
// buf.iterator.toArray call, thus avoiding building up the buffer twice.
val buf = new ArrayBuffer[(K, V)]
- def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+ def addTupleToBuffer(k: K, v: V) { buf += ((k, v)) }
SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
if (ascending) {
buf.sortWith((x, y) => x._1 < y._1).iterator
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0643c367f0..79a9e8e34e 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -409,10 +409,11 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
- logInfo("Starting job...")
+ val callSite = Utils.getSparkCallSite
+ logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runJob(rdd, func, partitions, allowLocal)
- logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
+ val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal)
+ logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
@@ -445,10 +446,11 @@ class SparkContext(
evaluator: ApproximateEvaluator[U, R],
timeout: Long
): PartialResult[R] = {
- logInfo("Starting job...")
+ val callSite = Utils.getSparkCallSite
+ logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runApproximateJob(rdd, func, evaluator, timeout)
- logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
+ val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout)
+ logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index ff28d52484..a480fe046d 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -369,8 +369,13 @@ private object Utils extends Logging {
for (el <- trace) {
if (!finished) {
- if (el.getClassName.contains("spark") && !el.getClassName.startsWith("spark.examples")) {
- lastSparkMethod = el.getMethodName
+ if (el.getClassName.startsWith("spark.") && !el.getClassName.startsWith("spark.examples.")) {
+ lastSparkMethod = if (el.getMethodName == "<init>") {
+ // Spark method is a constructor; get its class name
+ el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
+ } else {
+ el.getMethodName
+ }
}
else {
firstUserLine = el.getLineNumber
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 0d9922766a..70931407ce 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -188,23 +188,22 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
missing.toList
}
- def runJob[T, U](
+ def runJob[T, U: ClassManifest](
finalRdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
+ callSite: String,
allowLocal: Boolean)
- (implicit m: ClassManifest[U]): Array[U] =
+ : Array[U] =
{
if (partitions.size == 0) {
return new Array[U](0)
}
val waiter = new JobWaiter(partitions.size)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
- val callSite = Utils.getSparkCallSite
eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter))
waiter.getResult() match {
case JobSucceeded(results: Seq[_]) =>
- logInfo("Finished " + callSite)
return results.asInstanceOf[Seq[U]].toArray
case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite)
@@ -216,13 +215,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
- timeout: Long
- ): PartialResult[R] =
+ callSite: String,
+ timeout: Long)
+ : PartialResult[R] =
{
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.splits.size).toArray
- eventQueue.put(JobSubmitted(rdd, func2, partitions, false, Utils.getSparkCallSite, listener))
+ eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener))
return listener.getResult() // Will throw an exception if the job fails
}