aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-10-10 13:26:43 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-10 13:26:43 -0700
commit001d13f7b9db9584492a8117077110a11eae4600 (patch)
tree2ae8656cb3b7ee667bdd9dd727c4f91f26e3abcc /core
parent12d593129df8a434f66bd3d01812cab76f40e6b8 (diff)
parent320418f7c8b42d4ce781b32c9ee47a9b54550b89 (diff)
downloadspark-001d13f7b9db9584492a8117077110a11eae4600.tar.gz
spark-001d13f7b9db9584492a8117077110a11eae4600.tar.bz2
spark-001d13f7b9db9584492a8117077110a11eae4600.zip
Merge branch 'master' into fast-map
Conflicts: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala21
12 files changed, 96 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index febcf9c6ee..ff45e76105 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -231,7 +231,7 @@ class SparkContext(
}
taskScheduler.start()
- @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
+ @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
ui.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
index 5a24042e14..c87b66f047 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -34,7 +34,7 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
override def getValue: Long = application.duration
})
- metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("cores"), new Gauge[Int] {
override def getValue: Int = application.coresGranted
})
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
index 23d1cb77da..36c1b87b7f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -26,17 +26,17 @@ private[spark] class MasterSource(val master: Master) extends Source {
val sourceName = "master"
// Gauge for worker numbers in cluster
- metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
override def getValue: Int = master.workers.size
})
// Gauge for application numbers in cluster
- metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] {
override def getValue: Int = master.apps.size
})
// Gauge for waiting application numbers in cluster
- metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
})
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
index df269fd047..b7ddd8c816 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -25,27 +25,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
- metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
- metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
// Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("memUsed_MB"), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})
// Gauge for cores free of this worker
- metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] {
override def getValue: Int = worker.coresFree
})
// Gauge for memory free of this worker
- metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("memFree_MB"), new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 18c9dc1c0a..34ed9c8f73 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -43,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
- metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
- metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})
// Gauge for executor thread pool's current number of threads
- metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
})
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
- metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
// Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) {
- registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
- registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
- registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
- registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
- registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
+ registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
+ registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
+ registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
+ registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
+ registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4053b91134..5c40f5095a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -114,7 +114,7 @@ class DAGScheduler(
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
- private val listenerBus = new SparkListenerBus()
+ private[spark] val listenerBus = new SparkListenerBus()
// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 446d490cc9..151514896f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -27,23 +27,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
val metricRegistry = new MetricRegistry()
val sourceName = "%s.DAGScheduler".format(sc.appName)
- metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
})
- metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
})
- metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
})
- metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextJobId.get()
})
- metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index a65e1ecd6d..4d3e4a17ba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -70,5 +70,23 @@ private[spark] class SparkListenerBus() extends Logging {
queueFullErrorMessageLogged = true
}
}
+
+ /**
+ * Waits until there are no more events in the queue, or until the specified time has elapsed.
+ * Used for testing only. Returns true if the queue has emptied and false is the specified time
+ * elapsed before the queue emptied.
+ */
+ def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+ val finishTime = System.currentTimeMillis + timeoutMillis
+ while (!eventQueue.isEmpty()) {
+ if (System.currentTimeMillis > finishTime) {
+ return false
+ }
+ /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+ * add overhead in the general case. */
+ Thread.sleep(10)
+ }
+ return true
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 6c500bad92..e936b1cfed 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{KryoException, Kryo}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.twitter.chill.ScalaKryoInstantiator
+import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel}
@@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
def newKryoOutput() = new KryoOutput(bufferSize)
def newKryo(): Kryo = {
- val instantiator = new ScalaKryoInstantiator
+ val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()
val classLoader = Thread.currentThread.getContextClassLoader
@@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
StorageLevel.MEMORY_ONLY,
PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock("1", ByteBuffer.allocate(1)),
- GetBlock("1")
+ GetBlock("1"),
+ 1 to 10,
+ 1 until 10,
+ 1L to 10L,
+ 1L until 10L
)
for (obj <- toRegister) kryo.register(obj.getClass)
@@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
case _: Exception => println("Failed to register spark.kryo.registrator")
}
+ // Register Chill's classes; we do this after our ranges and the user's own classes to let
+ // our code override the generic serialziers in Chill for things like Seq
+ new AllScalaRegistrar().apply(kryo)
+
kryo.setClassLoader(classLoader)
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index acc3951088..365866d1e3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -28,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
val metricRegistry = new MetricRegistry()
val sourceName = "%s.BlockManager".format(sc.appName)
- metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
@@ -36,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
}
})
- metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
@@ -44,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
}
})
- metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
@@ -53,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
}
})
- metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 794c3e8f8f..a549417a47 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -23,15 +23,9 @@ import scala.collection.mutable
import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext._
-/**
- *
- */
-
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
- // TODO: This test has a race condition since the DAGScheduler now reports results
- // asynchronously. It needs to be updated for that patch.
- ignore("local metrics") {
+ test("local metrics") {
sc = new SparkContext("local[4]", "test")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
@@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
d.count()
- Thread.sleep(1000)
+ val WAIT_TIMEOUT_MILLIS = 10000
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (1)
val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
@@ -57,18 +52,25 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
d4.collectAsMap()
- Thread.sleep(1000)
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (4)
- listener.stageInfos.foreach {stageInfo =>
- //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
+ listener.stageInfos.foreach { stageInfo =>
+ /* small test, so some tasks might take less than 1 millisecond, but average should be greater
+ * than 0 ms. */
checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration")
- checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime")
- checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime")
+ checkNonZeroAvg(
+ stageInfo.taskInfos.map{_._2.executorRunTime.toLong},
+ stageInfo + " executorRunTime")
+ checkNonZeroAvg(
+ stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong},
+ stageInfo + " executorDeserializeTime")
if (stageInfo.stage.rdd.name == d4.name) {
- checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime")
+ checkNonZeroAvg(
+ stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime},
+ stageInfo + " fetchWaitTime")
}
- stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
+ stageInfo.taskInfos.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
taskMetrics.shuffleWriteMetrics should be ('defined)
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 0164dda0ba..c016c51171 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
}
+ test("ranges") {
+ val ser = (new KryoSerializer).newInstance()
+ def check[T](t: T) {
+ assert(ser.deserialize[T](ser.serialize(t)) === t)
+ // Check that very long ranges don't get written one element at a time
+ assert(ser.serialize(t).limit < 100)
+ }
+ check(1 to 1000000)
+ check(1 to 1000000 by 2)
+ check(1 until 1000000)
+ check(1 until 1000000 by 2)
+ check(1L to 1000000L)
+ check(1L to 1000000L by 2L)
+ check(1L until 1000000L)
+ check(1L until 1000000L by 2L)
+ check(1.0 to 1000000.0 by 1.0)
+ check(1.0 to 1000000.0 by 2.0)
+ check(1.0 until 1000000.0 by 1.0)
+ check(1.0 until 1000000.0 by 2.0)
+ }
+
test("custom registrator") {
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)