aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDmitriy Lyubimov <dlyubimov@apache.org>2013-07-31 22:09:22 -0700
committerDmitriy Lyubimov <dlyubimov@apache.org>2013-07-31 22:09:22 -0700
commitcb6be5bd7eab8b3cf682a6d0347b87d216d43939 (patch)
tree5071c649b24b37e5669571981da8100aaec3aaf8 /core
parent28f1550f0134bad1391682135b9bfc43cb19fc01 (diff)
parent3097d75d6f5b93cac851dd6f43faed5a492b2676 (diff)
downloadspark-cb6be5bd7eab8b3cf682a6d0347b87d216d43939.tar.gz
spark-cb6be5bd7eab8b3cf682a6d0347b87d216d43939.tar.bz2
spark-cb6be5bd7eab8b3cf682a6d0347b87d216d43939.zip
Merge remote-tracking branch 'mesos/master' into SPARK-826
Conflicts: core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala core/src/test/scala/spark/KryoSerializerSuite.scala
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml22
-rw-r--r--core/src/main/scala/spark/Cache.scala80
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala137
-rw-r--r--core/src/main/scala/spark/Partitioner.scala14
-rw-r--r--core/src/main/scala/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala11
-rw-r--r--core/src/main/scala/spark/Utils.scala11
-rw-r--r--core/src/main/scala/spark/api/python/PythonPartitioner.scala25
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala64
-rw-r--r--core/src/main/scala/spark/api/python/PythonWorkerFactory.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala36
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala146
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala11
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala21
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala47
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterSource.scala25
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala16
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala19
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala38
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerSource.scala34
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala23
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/spark/executor/ExecutorSource.scala30
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala17
-rw-r--r--core/src/main/scala/spark/io/CompressionCodec.scala82
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala96
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala146
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala59
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala68
-rw-r--r--core/src/main/scala/spark/metrics/sink/JmxSink.scala (renamed from core/src/main/scala/spark/SoftReferenceCache.scala)24
-rw-r--r--core/src/main/scala/spark/metrics/sink/Sink.scala23
-rw-r--r--core/src/main/scala/spark/metrics/source/JvmSource.scala32
-rw-r--r--core/src/main/scala/spark/metrics/source/Source.scala25
-rw-r--r--core/src/main/scala/spark/rdd/PartitionPruningRDD.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala38
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala30
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala168
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala61
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala22
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala22
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala89
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala27
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala17
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala18
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala1
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala163
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSource.scala48
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala1
-rw-r--r--core/src/main/scala/spark/storage/BlockMessageArray.scala5
-rw-r--r--core/src/main/scala/spark/storage/BlockObjectWriter.scala2
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala1
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala27
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala80
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala66
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala54
-rw-r--r--core/src/main/scala/spark/util/Vector.scala5
-rw-r--r--core/src/test/resources/test_metrics_config.properties6
-rw-r--r--core/src/test/resources/test_metrics_system.properties7
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala39
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala14
-rw-r--r--core/src/test/scala/spark/PartitionPruningRDDSuite.scala28
-rw-r--r--core/src/test/scala/spark/io/CompressionCodecSuite.scala62
-rw-r--r--core/src/test/scala/spark/metrics/MetricsConfigSuite.scala64
-rw-r--r--core/src/test/scala/spark/metrics/MetricsSystemSuite.scala39
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala13
77 files changed, 1892 insertions, 821 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 6329b2fbd8..ba0071f582 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
<artifactId>compress-lzf</artifactId>
</dependency>
<dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</dependency>
@@ -57,8 +61,14 @@
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
- <groupId>de.javakaffee</groupId>
- <artifactId>kryo-serializers</artifactId>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill_2.9.3</artifactId>
+ <version>0.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-java</artifactId>
+ <version>0.3.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
@@ -108,6 +118,14 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala
deleted file mode 100644
index b0c83ce59d..0000000000
--- a/core/src/main/scala/spark/Cache.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util.concurrent.atomic.AtomicInteger
-
-private[spark] sealed trait CachePutResponse
-private[spark] case class CachePutSuccess(size: Long) extends CachePutResponse
-private[spark] case class CachePutFailure() extends CachePutResponse
-
-/**
- * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
- * both partitions of cached RDDs and broadcast variables on Spark executors. Caches are also aware
- * of which entries are part of the same dataset (for example, partitions in the same RDD). The key
- * for each value in a cache is a (datasetID, partition) pair.
- *
- * A single Cache instance gets created on each machine and is shared by all caches (i.e. both the
- * RDD split cache and the broadcast variable cache), to enable global replacement policies.
- * However, because these several independent modules all perform caching, it is important to give
- * them separate key namespaces, so that an RDD and a broadcast variable (for example) do not use
- * the same key. For this purpose, Cache has the notion of KeySpaces. Each client module must first
- * ask for a KeySpace, and then call get() and put() on that space using its own keys.
- *
- * This abstract class handles the creation of key spaces, so that subclasses need only deal with
- * keys that are unique across modules.
- */
-private[spark] abstract class Cache {
- private val nextKeySpaceId = new AtomicInteger(0)
- private def newKeySpaceId() = nextKeySpaceId.getAndIncrement()
-
- def newKeySpace() = new KeySpace(this, newKeySpaceId())
-
- /**
- * Get the value for a given (datasetId, partition), or null if it is not
- * found.
- */
- def get(datasetId: Any, partition: Int): Any
-
- /**
- * Attempt to put a value in the cache; returns CachePutFailure if this was
- * not successful (e.g. because the cache replacement policy forbids it), and
- * CachePutSuccess if successful. If size estimation is available, the cache
- * implementation should set the size field in CachePutSuccess.
- */
- def put(datasetId: Any, partition: Int, value: Any): CachePutResponse
-
- /**
- * Report the capacity of the cache partition. By default this just reports
- * zero. Specific implementations can choose to provide the capacity number.
- */
- def getCapacity: Long = 0L
-}
-
-/**
- * A key namespace in a Cache.
- */
-private[spark] class KeySpace(cache: Cache, val keySpaceId: Int) {
- def get(datasetId: Any, partition: Int): Any =
- cache.get((keySpaceId, datasetId), partition)
-
- def put(datasetId: Any, partition: Int, value: Any): CachePutResponse =
- cache.put((keySpaceId, datasetId), partition, value)
-
- def getCapacity: Long = cache.getCapacity
-}
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index ee37da7948..eeb2993d8a 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -19,24 +19,16 @@ package spark
import java.io._
import java.nio.ByteBuffer
-import java.nio.channels.Channels
-
-import scala.collection.immutable
-import scala.collection.mutable
-
-import com.esotericsoftware.kryo._
-import com.esotericsoftware.kryo.{Serializer => KSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
-
+import com.twitter.chill.ScalaKryoInstantiator
import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
import spark.broadcast._
import spark.storage._
private[spark]
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
-
val output = new KryoOutput(outStream)
def writeObject[T](t: T): SerializationStream = {
@@ -50,7 +42,6 @@ class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends Seria
private[spark]
class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
-
val input = new KryoInput(inStream)
def readObject[T](): T = {
@@ -58,7 +49,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
kryo.readClassAndObject(input).asInstanceOf[T]
} catch {
// DeserializationStream uses the EOF exception to indicate stopping condition.
- case e: com.esotericsoftware.kryo.KryoException => throw new java.io.EOFException
+ case _: KryoException => throw new EOFException
}
}
@@ -69,10 +60,9 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
}
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
-
- val kryo = ks.kryo.get()
- val output = ks.output.get()
- val input = ks.input.get()
+ val kryo = ks.newKryo()
+ val output = ks.newKryoOutput()
+ val input = ks.newKryoInput()
def serialize[T](t: T): ByteBuffer = {
output.clear()
@@ -108,125 +98,51 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
* serialization.
*/
trait KryoRegistrator {
- def registerClasses(kryo: Kryo): Unit
+ def registerClasses(kryo: Kryo)
}
/**
* A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
*/
class KryoSerializer extends spark.serializer.Serializer with Logging {
+ private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
- val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+ def newKryoOutput() = new KryoOutput(bufferSize)
- val kryo = new ThreadLocal[Kryo] {
- override def initialValue = createKryo()
- }
-
- val output = new ThreadLocal[KryoOutput] {
- override def initialValue = new KryoOutput(bufferSize)
- }
-
- val input = new ThreadLocal[KryoInput] {
- override def initialValue = new KryoInput(bufferSize)
- }
+ def newKryoInput() = new KryoInput(bufferSize)
- def createKryo(): Kryo = {
- val kryo = new KryoReflectionFactorySupport()
+ def newKryo(): Kryo = {
+ val instantiator = new ScalaKryoInstantiator
+ val kryo = instantiator.newKryo()
+ val classLoader = Thread.currentThread.getContextClassLoader
// Register some commonly used classes
val toRegister: Seq[AnyRef] = Seq(
- // Arrays
- Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""), Array(("", "")),
- Array(new java.lang.Object), Array(1.toByte), Array(true), Array('c'),
- // Specialized Tuple2s
- ("", ""), ("", 1), (1, 1), (1.0, 1.0), (1L, 1L),
- (1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1),
- // Scala collections
- List(1), mutable.ArrayBuffer(1),
- // Options and Either
- Some(1), Left(1), Right(1),
- // Higher-dimensional tuples
- (1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1),
- None,
ByteBuffer.allocate(1),
StorageLevel.MEMORY_ONLY,
PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock("1", ByteBuffer.allocate(1)),
GetBlock("1")
)
- for (obj <- toRegister) {
- kryo.register(obj.getClass)
- }
+
+ for (obj <- toRegister) kryo.register(obj.getClass)
// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
- // Register some commonly used Scala singleton objects. Because these
- // are singletons, we must return the exact same local object when we
- // deserialize rather than returning a clone as FieldSerializer would.
- class SingletonSerializer[T](obj: T) extends KSerializer[T] {
- override def write(kryo: Kryo, output: KryoOutput, obj: T) {}
- override def read(kryo: Kryo, input: KryoInput, cls: java.lang.Class[T]): T = obj
- }
- kryo.register(None.getClass, new SingletonSerializer[AnyRef](None))
- kryo.register(Nil.getClass, new SingletonSerializer[AnyRef](Nil))
-
- // Register maps with a special serializer since they have complex internal structure
- class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any])
- extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
-
- //hack, look at https://groups.google.com/forum/#!msg/kryo-users/Eu5V4bxCfws/k-8UQ22y59AJ
- private final val FAKE_REFERENCE = new Object()
- override def write(
- kryo: Kryo,
- output: KryoOutput,
- obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
- val map = obj.asInstanceOf[scala.collection.Map[Any, Any]]
- output.writeInt(map.size)
- for ((k, v) <- map) {
- kryo.writeClassAndObject(output, k)
- kryo.writeClassAndObject(output, v)
- }
- }
- override def read (
- kryo: Kryo,
- input: KryoInput,
- cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
- : Array[(Any, Any)] => scala.collection.Map[Any, Any] = {
- kryo.reference(FAKE_REFERENCE)
- val size = input.readInt()
- val elems = new Array[(Any, Any)](size)
- for (i <- 0 until size) {
- val k = kryo.readClassAndObject(input)
- val v = kryo.readClassAndObject(input)
- elems(i)=(k,v)
- }
- buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]]
+ // Allow the user to register their own classes by setting spark.kryo.registrator
+ try {
+ Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
+ logDebug("Running user registrator: " + regCls)
+ val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+ reg.registerClasses(kryo)
}
+ } catch {
+ case _: Exception => println("Failed to register spark.kryo.registrator")
}
- kryo.register(mutable.HashMap().getClass, new ScalaMapSerializer(mutable.HashMap() ++ _))
- // TODO: add support for immutable maps too; this is more annoying because there are many
- // subclasses of immutable.Map for small maps (with <= 4 entries)
- val map1 = Map[Any, Any](1 -> 1)
- val map2 = Map[Any, Any](1 -> 1, 2 -> 2)
- val map3 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3)
- val map4 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)
- val map5 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4, 5 -> 5)
- kryo.register(map1.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map2.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map3.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map4.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map5.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- // Allow the user to register their own classes by setting spark.kryo.registrator
- val regCls = System.getProperty("spark.kryo.registrator")
- if (regCls != null) {
- logInfo("Running user registrator: " + regCls)
- val classLoader = Thread.currentThread.getContextClassLoader
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
- reg.registerClasses(kryo)
- }
+ kryo.setClassLoader(classLoader)
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
@@ -235,7 +151,6 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
}
def newInstance(): SerializerInstance = {
- this.kryo.get().setClassLoader(Thread.currentThread().getContextClassLoader)
new KryoSerializerInstance(this)
}
-}
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 660af70d52..6035bc075e 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -65,17 +65,9 @@ object Partitioner {
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
- def getPartition(key: Any): Int = {
- if (key == null) {
- return 0
- } else {
- val mod = key.hashCode % partitions
- if (mod < 0) {
- mod + partitions
- } else {
- mod // Guard against negative hash codes
- }
- }
+ def getPartition(key: Any): Int = key match {
+ case null => 0
+ case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 24ba605646..77cb0ee0cd 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -60,13 +60,14 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
+import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
+import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import ui.{SparkUI}
+import spark.metrics._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -270,6 +271,16 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
+ val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
+ val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+
+ def initDriverMetrics() {
+ SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
+ SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
+ }
+
+ initDriverMetrics()
+
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index f2bdc11bdb..4a1d341f5d 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -24,6 +24,7 @@ import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
import akka.remote.RemoteActorRefProvider
import spark.broadcast.BroadcastManager
+import spark.metrics.MetricsSystem
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
@@ -53,6 +54,7 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
+ val metricsSystem: MetricsSystem,
// To be set only as part of initialization of SparkContext.
// (executorId, defaultHostPort) => executorHostPort
// If executorId is NOT found, return defaultHostPort
@@ -68,6 +70,7 @@ class SparkEnv (
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
+ metricsSystem.stop()
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
@@ -184,6 +187,13 @@ object SparkEnv extends Logging {
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+ val metricsSystem = if (isDriver) {
+ MetricsSystem.createMetricsSystem("driver")
+ } else {
+ MetricsSystem.createMetricsSystem("executor")
+ }
+ metricsSystem.start()
+
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
@@ -213,6 +223,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
+ metricsSystem,
None)
}
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 8e6fc66955..673f9a810d 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -638,7 +638,7 @@ private object Utils extends Logging {
output.toString
}
- /**
+ /**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/
@@ -798,4 +798,13 @@ private object Utils extends Logging {
}
return buf
}
+
+ /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
+ * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
+ * so function return (x % mod) + mod in that case.
+ */
+ def nonNegativeMod(x: Int, mod: Int): Int = {
+ val rawMod = x % mod
+ rawMod + (if (rawMod < 0) mod else 0)
+ }
}
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
index 31a719fbff..ac112b8c2c 100644
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -18,7 +18,7 @@
package spark.api.python
import spark.Partitioner
-
+import spark.Utils
import java.util.Arrays
/**
@@ -35,25 +35,10 @@ private[spark] class PythonPartitioner(
val pyPartitionFunctionId: Long)
extends Partitioner {
- override def getPartition(key: Any): Int = {
- if (key == null) {
- return 0
- }
- else {
- val hashCode = {
- if (key.isInstanceOf[Array[Byte]]) {
- Arrays.hashCode(key.asInstanceOf[Array[Byte]])
- } else {
- key.hashCode()
- }
- }
- val mod = hashCode % numPartitions
- if (mod < 0) {
- mod + numPartitions
- } else {
- mod // Guard against negative hash codes
- }
- }
+ override def getPartition(key: Any): Int = key match {
+ case null => 0
+ case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
+ case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
}
override def equals(other: Any): Boolean = other match {
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index af10822dbd..2dd79f7100 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -63,34 +63,42 @@ private[spark] class PythonRDD[T: ClassManifest](
// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + pythonExec) {
override def run() {
- SparkEnv.set(env)
- val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
- val dataOut = new DataOutputStream(stream)
- val printOut = new PrintWriter(stream)
- // Partition index
- dataOut.writeInt(split.index)
- // sparkFilesDir
- PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut)
- // Broadcast variables
- dataOut.writeInt(broadcastVars.length)
- for (broadcast <- broadcastVars) {
- dataOut.writeLong(broadcast.id)
- dataOut.writeInt(broadcast.value.length)
- dataOut.write(broadcast.value)
- }
- dataOut.flush()
- // Serialized user code
- for (elem <- command) {
- printOut.println(elem)
- }
- printOut.flush()
- // Data values
- for (elem <- parent.iterator(split, context)) {
- PythonRDD.writeAsPickle(elem, dataOut)
+ try {
+ SparkEnv.set(env)
+ val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
+ val dataOut = new DataOutputStream(stream)
+ val printOut = new PrintWriter(stream)
+ // Partition index
+ dataOut.writeInt(split.index)
+ // sparkFilesDir
+ PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut)
+ // Broadcast variables
+ dataOut.writeInt(broadcastVars.length)
+ for (broadcast <- broadcastVars) {
+ dataOut.writeLong(broadcast.id)
+ dataOut.writeInt(broadcast.value.length)
+ dataOut.write(broadcast.value)
+ }
+ dataOut.flush()
+ // Serialized user code
+ for (elem <- command) {
+ printOut.println(elem)
+ }
+ printOut.flush()
+ // Data values
+ for (elem <- parent.iterator(split, context)) {
+ PythonRDD.writeAsPickle(elem, dataOut)
+ }
+ dataOut.flush()
+ printOut.flush()
+ worker.shutdownOutput()
+ } catch {
+ case e: IOException =>
+ // This can happen for legitimate reasons if the Python code stops returning data before we are done
+ // passing elements through, e.g., for take(). Just log a message to say it happened.
+ logInfo("stdin writer to Python finished early")
+ logDebug("stdin writer to Python finished early", e)
}
- dataOut.flush()
- printOut.flush()
- worker.shutdownOutput()
}
}.start()
@@ -297,7 +305,7 @@ class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
Utils.checkHost(serverHost, "Expected hostname")
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-
+
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
override def addInPlace(val1: JList[Array[Byte]], val2: JList[Array[Byte]])
diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
index 078ad45ce8..14f8320678 100644
--- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
@@ -17,7 +17,7 @@
package spark.api.python
-import java.io.{DataInputStream, IOException}
+import java.io.{File, DataInputStream, IOException}
import java.net.{Socket, SocketException, InetAddress}
import scala.collection.JavaConversions._
@@ -67,6 +67,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
+ workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
// Redirect the stderr to ours
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index c565876950..138a8c21bc 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -17,21 +17,20 @@
package spark.broadcast
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
-import java.io._
-import java.net._
-import java.util.UUID
+import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
+import java.net.URL
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-import spark._
+import spark.{HttpServer, Logging, SparkEnv, Utils}
+import spark.io.CompressionCodec
import spark.storage.StorageLevel
-import util.{MetadataCleaner, TimeStampedHashSet}
+import spark.util.{MetadataCleaner, TimeStampedHashSet}
+
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
-extends Broadcast[T](id) with Logging with Serializable {
+ extends Broadcast[T](id) with Logging with Serializable {
def value = value_
@@ -85,6 +84,7 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
+ private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) {
synchronized {
@@ -122,10 +122,12 @@ private object HttpBroadcast extends Logging {
def write(id: Long, value: Any) {
val file = new File(broadcastDir, "broadcast-" + id)
- val out: OutputStream = if (compress) {
- new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
- } else {
- new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+ val out: OutputStream = {
+ if (compress) {
+ compressionCodec.compressedOutputStream(new FileOutputStream(file))
+ } else {
+ new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+ }
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
@@ -136,10 +138,12 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = {
val url = serverUri + "/broadcast-" + id
- var in = if (compress) {
- new LZFInputStream(new URL(url).openStream()) // Does its own buffering
- } else {
- new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ val in = {
+ if (compress) {
+ compressionCodec.compressedInputStream(new URL(url).openStream())
+ } else {
+ new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ }
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index e1f8aff6f5..7c37a16615 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -17,109 +17,107 @@
package spark.deploy
+import scala.collection.immutable.List
+
+import spark.Utils
import spark.deploy.ExecutorState.ExecutorState
import spark.deploy.master.{WorkerInfo, ApplicationInfo}
import spark.deploy.worker.ExecutorRunner
-import scala.collection.immutable.List
-import spark.Utils
-private[spark] sealed trait DeployMessage extends Serializable
+private[deploy] sealed trait DeployMessage extends Serializable
-// Worker to Master
+private[deploy] object DeployMessages {
-private[spark]
-case class RegisterWorker(
- id: String,
- host: String,
- port: Int,
- cores: Int,
- memory: Int,
- webUiPort: Int,
- publicAddress: String)
- extends DeployMessage {
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
-}
+ // Worker to Master
-private[spark]
-case class ExecutorStateChanged(
- appId: String,
- execId: Int,
- state: ExecutorState,
- message: Option[String],
- exitStatus: Option[Int])
- extends DeployMessage
+ case class RegisterWorker(
+ id: String,
+ host: String,
+ port: Int,
+ cores: Int,
+ memory: Int,
+ webUiPort: Int,
+ publicAddress: String)
+ extends DeployMessage {
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+ }
-private[spark] case class Heartbeat(workerId: String) extends DeployMessage
+ case class ExecutorStateChanged(
+ appId: String,
+ execId: Int,
+ state: ExecutorState,
+ message: Option[String],
+ exitStatus: Option[Int])
+ extends DeployMessage
-// Master to Worker
+ case class Heartbeat(workerId: String) extends DeployMessage
-private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-private[spark] case class RegisterWorkerFailed(message: String) extends DeployMessage
-private[spark] case class KillExecutor(appId: String, execId: Int) extends DeployMessage
+ // Master to Worker
-private[spark] case class LaunchExecutor(
- appId: String,
- execId: Int,
- appDesc: ApplicationDescription,
- cores: Int,
- memory: Int,
- sparkHome: String)
- extends DeployMessage
+ case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-// Client to Master
+ case class RegisterWorkerFailed(message: String) extends DeployMessage
-private[spark] case class RegisterApplication(appDescription: ApplicationDescription)
- extends DeployMessage
+ case class KillExecutor(appId: String, execId: Int) extends DeployMessage
-// Master to Client
+ case class LaunchExecutor(
+ appId: String,
+ execId: Int,
+ appDesc: ApplicationDescription,
+ cores: Int,
+ memory: Int,
+ sparkHome: String)
+ extends DeployMessage
-private[spark]
-case class RegisteredApplication(appId: String) extends DeployMessage
+ // Client to Master
-private[spark]
-case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
- Utils.checkHostPort(hostPort, "Required hostport")
-}
+ case class RegisterApplication(appDescription: ApplicationDescription)
+ extends DeployMessage
-private[spark]
-case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
- exitStatus: Option[Int])
+ // Master to Client
-private[spark]
-case class ApplicationRemoved(message: String)
+ case class RegisteredApplication(appId: String) extends DeployMessage
-// Internal message in Client
+ case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
+ Utils.checkHostPort(hostPort, "Required hostport")
+ }
-private[spark] case object StopClient
+ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
+ exitStatus: Option[Int])
-// MasterWebUI To Master
+ case class ApplicationRemoved(message: String)
-private[spark] case object RequestMasterState
+ // Internal message in Client
-// Master to MasterWebUI
+ case object StopClient
-private[spark]
-case class MasterState(host: String, port: Int, workers: Array[WorkerInfo],
- activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+ // MasterWebUI To Master
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
+ case object RequestMasterState
- def uri = "spark://" + host + ":" + port
-}
+ // Master to MasterWebUI
+
+ case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
+ activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+
+ def uri = "spark://" + host + ":" + port
+ }
-// WorkerWebUI to Worker
-private[spark] case object RequestWorkerState
+ // WorkerWebUI to Worker
+ case object RequestWorkerState
-// Worker to WorkerWebUI
+ // Worker to WorkerWebUI
-private[spark]
-case class WorkerState(host: String, port: Int, workerId: String, executors: List[ExecutorRunner],
- finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
- coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
+ case class WorkerStateResponse(host: String, port: Int, workerId: String,
+ executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
+ cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+ }
}
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index 64f89623e1..bd1db7c294 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -17,9 +17,12 @@
package spark.deploy
-import master.{ApplicationInfo, WorkerInfo}
import net.liftweb.json.JsonDSL._
-import worker.ExecutorRunner
+
+import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
+import spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import spark.deploy.worker.ExecutorRunner
+
private[spark] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo) = {
@@ -57,7 +60,7 @@ private[spark] object JsonProtocol {
("appdesc" -> writeApplicationDescription(obj.appDesc))
}
- def writeMasterState(obj: MasterState) = {
+ def writeMasterState(obj: MasterStateResponse) = {
("url" -> ("spark://" + obj.uri)) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
@@ -68,7 +71,7 @@ private[spark] object JsonProtocol {
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
}
- def writeWorkerState(obj: WorkerState) = {
+ def writeWorkerState(obj: WorkerStateResponse) = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
("masterwebuiurl" -> obj.masterWebUiUrl) ~
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index edefa0292d..9d5ba8a796 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -17,21 +17,23 @@
package spark.deploy.client
-import spark.deploy._
+import java.util.concurrent.TimeoutException
+
import akka.actor._
+import akka.actor.Terminated
import akka.pattern.ask
import akka.util.Duration
-import akka.util.duration._
-import akka.pattern.AskTimeoutException
-import spark.{SparkException, Logging}
+import akka.remote.RemoteClientDisconnected
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
-import spark.deploy.RegisterApplication
-import spark.deploy.master.Master
-import akka.remote.RemoteClientDisconnected
-import akka.actor.Terminated
import akka.dispatch.Await
+import spark.Logging
+import spark.deploy.{ApplicationDescription, ExecutorState}
+import spark.deploy.DeployMessages._
+import spark.deploy.master.Master
+
+
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
@@ -134,7 +136,8 @@ private[spark] class Client(
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
- case e: AskTimeoutException => // Ignore it, maybe master went away
+ case e: TimeoutException =>
+ logInfo("Stop request to Master timed out; it may already be shut down.")
}
actor = null
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index eddcafd84d..202d5bcdb7 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -17,20 +17,22 @@
package spark.deploy.master
-import akka.actor._
-import akka.actor.Terminated
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
-import akka.util.duration._
-
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import spark.deploy._
+import akka.actor._
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.util.duration._
+
import spark.{Logging, SparkException, Utils}
+import spark.deploy.{ApplicationDescription, ExecutorState}
+import spark.deploy.DeployMessages._
+import spark.deploy.master.ui.MasterWebUI
+import spark.metrics.MetricsSystem
import spark.util.AkkaUtils
-import ui.MasterWebUI
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -57,6 +59,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
+ val metricsSystem = MetricsSystem.createMetricsSystem("master")
+ val masterSource = new MasterSource(this)
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
@@ -73,10 +78,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
+
+ metricsSystem.registerSource(masterSource)
+ metricsSystem.start()
}
override def postStop() {
webUi.stop()
+ metricsSystem.stop()
}
override def receive = {
@@ -160,7 +169,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestMasterState => {
- sender ! MasterState(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
}
}
@@ -225,20 +234,27 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
- exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
+ worker.actor ! LaunchExecutor(
+ exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+ exec.application.driver ! ExecutorAdded(
+ exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
publicAddress: String): WorkerInfo = {
- // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them.
- workers.filter(w => (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
+ // There may be one or more refs to dead workers on this same node (w/ different ID's),
+ // remove them.
+ workers.filter { w =>
+ (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
+ }.foreach { w =>
+ workers -= w
+ }
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
addressToWorker(sender.path.address) = worker
- return worker
+ worker
}
def removeWorker(worker: WorkerInfo) {
@@ -249,7 +265,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
- exec.application.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
+ exec.application.driver ! ExecutorUpdated(
+ exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
}
@@ -269,7 +286,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
}
- return app
+ app
}
def finishApplication(app: ApplicationInfo) {
diff --git a/core/src/main/scala/spark/deploy/master/MasterSource.scala b/core/src/main/scala/spark/deploy/master/MasterSource.scala
new file mode 100644
index 0000000000..b8cfa6a773
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/MasterSource.scala
@@ -0,0 +1,25 @@
+package spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class MasterSource(val master: Master) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "master"
+
+ // Gauge for worker numbers in cluster
+ metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
+ override def getValue: Int = master.workers.size
+ })
+
+ // Gauge for application numbers in cluster
+ metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+ override def getValue: Int = master.apps.size
+ })
+
+ // Gauge for waiting application numbers in cluster
+ metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
+ override def getValue: Int = master.waitingApps.size
+ })
+}
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 32264af393..b4c62bc224 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,6 +17,8 @@
package spark.deploy.master.ui
+import scala.xml.Node
+
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
@@ -25,9 +27,8 @@ import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
-import scala.xml.Node
-
-import spark.deploy.{RequestMasterState, JsonProtocol, MasterState}
+import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import spark.deploy.JsonProtocol
import spark.deploy.master.ExecutorInfo
import spark.ui.UIUtils
@@ -38,7 +39,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
@@ -49,7 +50,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index b05197c1b9..557df89b41 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -17,18 +17,20 @@
package spark.deploy.master.ui
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.duration._
-
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import spark.deploy.{RequestMasterState, DeployWebUI, MasterState}
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+
import spark.Utils
-import spark.ui.UIUtils
+import spark.deploy.DeployWebUI
+import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import spark.ui.UIUtils
+
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.master
@@ -36,7 +38,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 8b51ff1c3a..345dfe879c 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -19,14 +19,12 @@ package spark.deploy.worker
import java.io._
import java.lang.System.getenv
-import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription}
+
import akka.actor.ActorRef
+
import spark.{Utils, Logging}
-import java.net.{URI, URL}
-import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.hadoop.conf.Configuration
-import scala.Some
-import spark.deploy.ExecutorStateChanged
+import spark.deploy.{ExecutorState, ApplicationDescription}
+import spark.deploy.DeployMessages.ExecutorStateChanged
/**
* Manages the execution of one executor process.
@@ -51,6 +49,9 @@ private[spark] class ExecutorRunner(
var process: Process = null
var shutdownHook: Thread = null
+ private def getAppEnv(key: String): Option[String] =
+ appDesc.command.environment.get(key).orElse(Option(getenv(key)))
+
def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
@@ -95,7 +96,7 @@ private[spark] class ExecutorRunner(
def buildCommandSeq(): Seq[String] = {
val command = appDesc.command
- val runner = Option(getenv("JAVA_HOME")).map(_ + "/bin/java").getOrElse("java")
+ val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
@@ -107,10 +108,10 @@ private[spark] class ExecutorRunner(
* the way the JAVA_OPTS are assembled there.
*/
def buildJavaOpts(): Seq[String] = {
- val libraryOpts = Option(getenv("SPARK_LIBRARY_PATH"))
+ val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
.map(p => List("-Djava.library.path=" + p))
.getOrElse(Nil)
- val userOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+ val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
// Figure out our classpath with the external compute-classpath script
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 0bd88ea253..0e46fa281e 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -17,21 +17,24 @@
package spark.deploy.worker
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.io.File
+
+import scala.collection.mutable.HashMap
+
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.util.duration._
+
import spark.{Logging, Utils}
-import spark.util.AkkaUtils
-import spark.deploy._
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import java.text.SimpleDateFormat
-import java.util.Date
-import spark.deploy.RegisterWorker
-import spark.deploy.LaunchExecutor
-import spark.deploy.RegisterWorkerFailed
+import spark.deploy.ExecutorState
+import spark.deploy.DeployMessages._
import spark.deploy.master.Master
-import java.io.File
-import ui.WorkerWebUI
+import spark.deploy.worker.ui.WorkerWebUI
+import spark.metrics.MetricsSystem
+import spark.util.AkkaUtils
+
private[spark] class Worker(
host: String,
@@ -67,6 +70,9 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
+ val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+ val workerSource = new WorkerSource(this)
+
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
@@ -97,6 +103,9 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
+
+ metricsSystem.registerSource(workerSource)
+ metricsSystem.start()
}
def connectToMaster() {
@@ -155,10 +164,10 @@ private[spark] class Worker(
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
-
+
case RequestWorkerState => {
- sender ! WorkerState(host, port, workerId, executors.values.toList,
- finishedExecutors.values.toList, masterUrl, cores, memory,
+ sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
+ finishedExecutors.values.toList, masterUrl, cores, memory,
coresUsed, memoryUsed, masterWebUiUrl)
}
}
@@ -178,6 +187,7 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
webUi.stop()
+ metricsSystem.stop()
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
new file mode 100644
index 0000000000..39cb8e5690
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
@@ -0,0 +1,34 @@
+package spark.deploy.worker
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class WorkerSource(val worker: Worker) extends Source {
+ val sourceName = "worker"
+ val metricRegistry = new MetricRegistry()
+
+ metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.executors.size
+ })
+
+ // Gauge for cores used of this worker
+ metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.coresUsed
+ })
+
+ // Gauge for memory used of this worker
+ metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
+ override def getValue: Int = worker.memoryUsed
+ })
+
+ // Gauge for cores free of this worker
+ metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.coresFree
+ })
+
+ // Gauge for memory free of this worker
+ metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
+ override def getValue: Int = worker.memoryFree
+ })
+}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index 7548a26c2e..1619c6a4c2 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -17,34 +17,36 @@
package spark.deploy.worker.ui
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
-import javax.servlet.http.HttpServletRequest
-
import net.liftweb.json.JsonAST.JValue
-import scala.xml.Node
-
-import spark.deploy.{RequestWorkerState, JsonProtocol, WorkerState}
-import spark.deploy.worker.ExecutorRunner
import spark.Utils
+import spark.deploy.JsonProtocol
+import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
+import spark.deploy.worker.ExecutorRunner
import spark.ui.UIUtils
+
private[spark] class IndexPage(parent: WorkerWebUI) {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
- val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
@@ -69,7 +71,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
</div>
</div>
- <hr/>
+ <hr/>
<div class="row"> <!-- Running Executors -->
<div class="span12">
@@ -88,7 +90,8 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
</div>
</div>;
- UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
+ UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(
+ workerState.host, workerState.port))
}
def executorRow(executor: ExecutorRunner): Seq[Node] = {
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 2e81151882..8a74a8d853 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -69,7 +69,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
-
+
// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
@@ -87,9 +87,13 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
)
+ val executorSource = new ExecutorSource(this)
+
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
+ env.metricsSystem.registerSource(executorSource)
+
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
new file mode 100644
index 0000000000..94116edfcf
--- /dev/null
+++ b/core/src/main/scala/spark/executor/ExecutorSource.scala
@@ -0,0 +1,30 @@
+package spark.executor
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+class ExecutorSource(val executor: Executor) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "executor"
+
+ // Gauge for executor thread pool's actively executing task counts
+ metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getActiveCount()
+ })
+
+ // Gauge for executor thread pool's approximate total number of tasks that have been completed
+ metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
+ override def getValue: Long = executor.threadPool.getCompletedTaskCount()
+ })
+
+ // Gauge for executor thread pool's current number of threads
+ metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getPoolSize()
+ })
+
+ // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+ metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getMaximumPoolSize()
+ })
+}
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index f4003da732..e47fe50021 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -18,19 +18,16 @@
package spark.executor
import java.nio.ByteBuffer
-import spark.Logging
-import spark.TaskState.TaskState
-import spark.util.AkkaUtils
+
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import spark.scheduler.cluster._
-import spark.scheduler.cluster.RegisteredExecutor
-import spark.scheduler.cluster.LaunchTask
-import spark.scheduler.cluster.RegisterExecutorFailed
-import spark.scheduler.cluster.RegisterExecutor
-import spark.Utils
+
+import spark.{Logging, Utils}
+import spark.TaskState.TaskState
import spark.deploy.SparkHadoopUtil
+import spark.scheduler.cluster.StandaloneClusterMessages._
+import spark.util.AkkaUtils
+
private[spark] class StandaloneExecutorBackend(
driverUrl: String,
diff --git a/core/src/main/scala/spark/io/CompressionCodec.scala b/core/src/main/scala/spark/io/CompressionCodec.scala
new file mode 100644
index 0000000000..0adebecadb
--- /dev/null
+++ b/core/src/main/scala/spark/io/CompressionCodec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.io
+
+import java.io.{InputStream, OutputStream}
+
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+
+
+/**
+ * CompressionCodec allows the customization of choosing different compression implementations
+ * to be used in block storage.
+ */
+trait CompressionCodec {
+
+ def compressedOutputStream(s: OutputStream): OutputStream
+
+ def compressedInputStream(s: InputStream): InputStream
+}
+
+
+private[spark] object CompressionCodec {
+
+ def createCodec(): CompressionCodec = {
+ // Set the default codec to Snappy since the LZF implementation initializes a pretty large
+ // buffer for every stream, which results in a lot of memory overhead when the number of
+ // shuffle reduce buckets are large.
+ createCodec(classOf[SnappyCompressionCodec].getName)
+ }
+
+ def createCodec(codecName: String): CompressionCodec = {
+ Class.forName(
+ System.getProperty("spark.io.compression.codec", codecName),
+ true,
+ Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec]
+ }
+}
+
+
+/**
+ * LZF implementation of [[spark.io.CompressionCodec]].
+ */
+class LZFCompressionCodec extends CompressionCodec {
+
+ override def compressedOutputStream(s: OutputStream): OutputStream = {
+ new LZFOutputStream(s).setFinishBlockOnFlush(true)
+ }
+
+ override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
+}
+
+
+/**
+ * Snappy implementation of [[spark.io.CompressionCodec]].
+ * Block size can be configured by spark.io.compression.snappy.block.size.
+ */
+class SnappyCompressionCodec extends CompressionCodec {
+
+ override def compressedOutputStream(s: OutputStream): OutputStream = {
+ val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
+ new SnappyOutputStream(s, blockSize)
+ }
+
+ override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
+}
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
new file mode 100644
index 0000000000..3e32e9c82f
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileInputStream, InputStream, IOException}
+
+import scala.collection.mutable
+import scala.util.matching.Regex
+
+import spark.Logging
+
+private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+ initLogging()
+
+ val DEFAULT_PREFIX = "*"
+ val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+ val METRICS_CONF = "metrics.properties"
+
+ val properties = new Properties()
+ var propertyCategories: mutable.HashMap[String, Properties] = null
+
+ private def setDefaultProperties(prop: Properties) {
+ // empty function, any default property can be set here
+ }
+
+ def initialize() {
+ //Add default properties in case there's no properties file
+ setDefaultProperties(properties)
+
+ // If spark.metrics.conf is not set, try to get file in class path
+ var is: InputStream = null
+ try {
+ is = configFile match {
+ case Some(f) => new FileInputStream(f)
+ case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
+ }
+
+ if (is != null) {
+ properties.load(is)
+ }
+ } catch {
+ case e: Exception => logError("Error loading configure file", e)
+ } finally {
+ if (is != null) is.close()
+ }
+
+ propertyCategories = subProperties(properties, INSTANCE_REGEX)
+ if (propertyCategories.contains(DEFAULT_PREFIX)) {
+ import scala.collection.JavaConversions._
+
+ val defaultProperty = propertyCategories(DEFAULT_PREFIX)
+ for { (inst, prop) <- propertyCategories
+ if (inst != DEFAULT_PREFIX)
+ (k, v) <- defaultProperty
+ if (prop.getProperty(k) == null) } {
+ prop.setProperty(k, v)
+ }
+ }
+ }
+
+ def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
+ val subProperties = new mutable.HashMap[String, Properties]
+ import scala.collection.JavaConversions._
+ prop.foreach { kv =>
+ if (regex.findPrefixOf(kv._1) != None) {
+ val regex(prefix, suffix) = kv._1
+ subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
+ }
+ }
+ subProperties
+ }
+
+ def getInstance(inst: String): Properties = {
+ propertyCategories.get(inst) match {
+ case Some(s) => s
+ case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
+ }
+ }
+}
+
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
new file mode 100644
index 0000000000..fabddfb947
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics
+
+import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import spark.Logging
+import spark.metrics.sink.Sink
+import spark.metrics.source.Source
+
+/**
+ * Spark Metrics System, created by specific "instance", combined by source,
+ * sink, periodically poll source metrics data to sink destinations.
+ *
+ * "instance" specify "who" (the role) use metrics system. In spark there are several roles
+ * like master, worker, executor, client driver, these roles will create metrics system
+ * for monitoring. So instance represents these roles. Currently in Spark, several instances
+ * have already implemented: master, worker, executor, driver.
+ *
+ * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
+ * two kinds of source:
+ * 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
+ * Spark component's internal state, these sources are related to instance and will be
+ * added after specific metrics system is created.
+ * 2. Common source, like JvmSource, which will collect low level state, is configured by
+ * configuration and loaded through reflection.
+ *
+ * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
+ * coexisted and flush metrics to all these sinks.
+ *
+ * Metrics configuration format is like below:
+ * [instance].[sink|source].[name].[options] = xxxx
+ *
+ * [instance] can be "master", "worker", "executor", "driver", which means only the specified
+ * instance has this property.
+ * wild card "*" can be used to replace instance name, which means all the instances will have
+ * this property.
+ *
+ * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ *
+ * [name] specify the name of sink or source, it is custom defined.
+ *
+ * [options] is the specific property of this source or sink.
+ */
+private[spark] class MetricsSystem private (val instance: String) extends Logging {
+ initLogging()
+
+ val confFile = System.getProperty("spark.metrics.conf")
+ val metricsConfig = new MetricsConfig(Option(confFile))
+
+ val sinks = new mutable.ArrayBuffer[Sink]
+ val sources = new mutable.ArrayBuffer[Source]
+ val registry = new MetricRegistry()
+
+ metricsConfig.initialize()
+ registerSources()
+ registerSinks()
+
+ def start() {
+ sinks.foreach(_.start)
+ }
+
+ def stop() {
+ sinks.foreach(_.stop)
+ }
+
+ def registerSource(source: Source) {
+ sources += source
+ try {
+ registry.register(source.sourceName, source.metricRegistry)
+ } catch {
+ case e: IllegalArgumentException => logInfo("Metrics already registered", e)
+ }
+ }
+
+ def registerSources() {
+ val instConfig = metricsConfig.getInstance(instance)
+ val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
+
+ // Register all the sources related to instance
+ sourceConfigs.foreach { kv =>
+ val classPath = kv._2.getProperty("class")
+ try {
+ val source = Class.forName(classPath).newInstance()
+ registerSource(source.asInstanceOf[Source])
+ } catch {
+ case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
+ }
+ }
+ }
+
+ def registerSinks() {
+ val instConfig = metricsConfig.getInstance(instance)
+ val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
+
+ sinkConfigs.foreach { kv =>
+ val classPath = kv._2.getProperty("class")
+ try {
+ val sink = Class.forName(classPath)
+ .getConstructor(classOf[Properties], classOf[MetricRegistry])
+ .newInstance(kv._2, registry)
+ sinks += sink.asInstanceOf[Sink]
+ } catch {
+ case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
+ }
+ }
+ }
+}
+
+private[spark] object MetricsSystem {
+ val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
+ val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
+
+ val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+ val MINIMAL_POLL_PERIOD = 1
+
+ def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
+ val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
+ if (period < MINIMAL_POLL_PERIOD) {
+ throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
+ " below than minimal polling period ")
+ }
+ }
+
+ def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+}
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
new file mode 100644
index 0000000000..966ba37c20
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import spark.metrics.MetricsSystem
+
+class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val CONSOLE_DEFAULT_PERIOD = 10
+ val CONSOLE_DEFAULT_UNIT = "SECONDS"
+
+ val CONSOLE_KEY_PERIOD = "period"
+ val CONSOLE_KEY_UNIT = "unit"
+
+ val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
+ case Some(s) => s.toInt
+ case None => CONSOLE_DEFAULT_PERIOD
+ }
+
+ val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
+ }
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build()
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+}
+
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
new file mode 100644
index 0000000000..cb990afdef
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+import com.codahale.metrics.{CsvReporter, MetricRegistry}
+
+import java.io.File
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+
+import spark.metrics.MetricsSystem
+
+class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val CSV_KEY_PERIOD = "period"
+ val CSV_KEY_UNIT = "unit"
+ val CSV_KEY_DIR = "directory"
+
+ val CSV_DEFAULT_PERIOD = 10
+ val CSV_DEFAULT_UNIT = "SECONDS"
+ val CSV_DEFAULT_DIR = "/tmp/"
+
+ val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
+ case Some(s) => s.toInt
+ case None => CSV_DEFAULT_PERIOD
+ }
+
+ val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
+ }
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
+ case Some(s) => s
+ case None => CSV_DEFAULT_DIR
+ }
+
+ val reporter: CsvReporter = CsvReporter.forRegistry(registry)
+ .formatFor(Locale.US)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build(new File(pollDir))
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+}
+
diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
index f41a379582..ee04544c0e 100644
--- a/core/src/main/scala/spark/SoftReferenceCache.scala
+++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package spark
+package spark.metrics.sink
-import com.google.common.collect.MapMaker
+import com.codahale.metrics.{JmxReporter, MetricRegistry}
-/**
- * An implementation of Cache that uses soft references.
- */
-private[spark] class SoftReferenceCache extends Cache {
- val map = new MapMaker().softValues().makeMap[Any, Any]()
+import java.util.Properties
+
+class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
- override def get(datasetId: Any, partition: Int): Any =
- map.get((datasetId, partition))
+ override def start() {
+ reporter.start()
+ }
- override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
- map.put((datasetId, partition), value)
- return CachePutSuccess(0)
+ override def stop() {
+ reporter.stop()
}
+
}
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
new file mode 100644
index 0000000000..dad1a7f0fe
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+trait Sink {
+ def start: Unit
+ def stop: Unit
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
new file mode 100644
index 0000000000..e771008557
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
+
+class JvmSource extends Source {
+ val sourceName = "jvm"
+ val metricRegistry = new MetricRegistry()
+
+ val gcMetricSet = new GarbageCollectorMetricSet
+ val memGaugeSet = new MemoryUsageGaugeSet
+
+ metricRegistry.registerAll(gcMetricSet)
+ metricRegistry.registerAll(memGaugeSet)
+}
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
new file mode 100644
index 0000000000..76199a004b
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+
+trait Source {
+ def sourceName: String
+ def metricRegistry: MetricRegistry
+}
diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
index 191cfde565..d8700becb0 100644
--- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
@@ -33,8 +33,9 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
extends NarrowDependency[T](rdd) {
@transient
- val partitions: Array[Partition] = rdd.partitions.filter(s => partitionFilterFunc(s.index))
- .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
+ val partitions: Array[Partition] = rdd.partitions.zipWithIndex
+ .filter(s => partitionFilterFunc(s._2))
+ .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
override def getParents(partitionId: Int) = List(partitions(partitionId).index)
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 29e879aa42..9b45fc2938 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -17,19 +17,17 @@
package spark.scheduler
-import cluster.TaskInfo
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.concurrent.TimeUnit
+import java.io.NotSerializableException
import java.util.Properties
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import spark._
import spark.executor.TaskMetrics
-import spark.partial.ApproximateActionListener
-import spark.partial.ApproximateEvaluator
-import spark.partial.PartialResult
+import spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
+import spark.scheduler.cluster.TaskInfo
import spark.storage.{BlockManager, BlockManagerMaster}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
@@ -52,6 +50,11 @@ class DAGScheduler(
}
taskSched.setListener(this)
+ // Called by TaskScheduler to report task's starting.
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+ eventQueue.put(BeginEvent(task, taskInfo))
+ }
+
// Called by TaskScheduler to report task completions or failures.
override def taskEnded(
task: Task[_],
@@ -258,7 +261,8 @@ class DAGScheduler(
assert(partitions.size > 0)
val waiter = new JobWaiter(partitions.size, resultHandler)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
- val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
+ val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
+ properties)
return (toSubmit, waiter)
}
@@ -283,7 +287,7 @@ class DAGScheduler(
"Total number of partitions: " + maxPartitions)
}
- val (toSubmit, waiter) = prepareJob(
+ val (toSubmit: JobSubmitted, waiter: JobWaiter[_]) = prepareJob(
finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
eventQueue.put(toSubmit)
waiter.awaitResult() match {
@@ -343,6 +347,9 @@ class DAGScheduler(
case ExecutorLost(execId) =>
handleExecutorLost(execId)
+ case begin: BeginEvent =>
+ sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo)))
+
case completion: CompletionEvent =>
sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
completion.reason, completion.taskInfo, completion.taskMetrics)))
@@ -504,6 +511,19 @@ class DAGScheduler(
}
}
if (tasks.size > 0) {
+ // Preemptively serialize a task to make sure it can be serialized. We are catching this
+ // exception here because it would be fairly hard to catch the non-serializable exception
+ // down the road, where we have several different implementations for local scheduler and
+ // cluster schedulers.
+ try {
+ SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
+ } catch {
+ case e: NotSerializableException =>
+ abortStage(stage, e.toString)
+ running -= stage
+ return
+ }
+
sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 506c87f65b..3b4ee6287a 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -43,6 +43,8 @@ private[spark] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent
+private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+
private[spark] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
new file mode 100644
index 0000000000..87d27cc70d
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
@@ -0,0 +1,30 @@
+package spark.scheduler
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "DAGScheduler"
+
+ metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.failed.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.running.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.waiting.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.nextRunId.get()
+ })
+
+ metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.activeJobs.size
+ })
+}
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 85b5ddd4a8..f7565b8c57 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -68,6 +68,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
processStageCompletedEvent(stageInfo)
case SparkListenerJobEnd(job, result) =>
processJobEndEvent(job, result)
+ case SparkListenerTaskStart(task, taskInfo) =>
+ processTaskStartEvent(task, taskInfo)
case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
processTaskEndEvent(task, reason, taskInfo, taskMetrics)
case _ =>
@@ -252,7 +254,19 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageInfo.stage.id + " STATUS=COMPLETED")
}
-
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ eventQueue.put(taskStart)
+ }
+
+ protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) {
+ var taskStatus = ""
+ task match {
+ case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
+ case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
+ }
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
eventQueue.put(taskEnd)
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 361b1e6b91..1ced6f9524 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -118,6 +118,7 @@ private[spark] class ResultTask[T, U](
out.write(bytes)
out.writeInt(partition)
out.writeInt(outputId)
+ out.writeLong(generation)
out.writeObject(split)
}
}
@@ -132,6 +133,7 @@ private[spark] class ResultTask[T, U](
func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
partition = in.readInt()
val outputId = in.readInt()
+ generation = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 1c25605f75..e3bb6d1e60 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -18,16 +18,9 @@
package spark.scheduler
import java.io._
-import java.util.{HashMap => JHashMap}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.collection.JavaConversions._
-
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-
-import com.ning.compress.lzf.LZFInputStream
-import com.ning.compress.lzf.LZFOutputStream
+import scala.collection.mutable.HashMap
import spark._
import spark.executor.ShuffleWriteMetrics
@@ -109,11 +102,7 @@ private[spark] class ShuffleMapTask(
preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
}
- var split = if (rdd == null) {
- null
- } else {
- rdd.partitions(partition)
- }
+ var split = if (rdd == null) null else rdd.partitions(partition)
override def writeExternal(out: ObjectOutput) {
RDDCheckpointData.synchronized {
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 4fb1c5d42d..4eb7e4e6a5 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -29,6 +29,8 @@ case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends Spar
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
+case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
@@ -48,7 +50,12 @@ trait SparkListener {
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
-
+
+ /**
+ * Called when a task starts
+ */
+ def onTaskStart(taskEnd: SparkListenerTaskStart) { }
+
/**
* Called when a task ends
*/
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index 245e7ccb52..2cdeb1c8c0 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -27,6 +27,9 @@ import spark.executor.TaskMetrics
* Interface for getting events back from the TaskScheduler.
*/
private[spark] trait TaskSchedulerListener {
+ // A task has started.
+ def taskStarted(task: Task[_], taskInfo: TaskInfo)
+
// A task has finished or failed.
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index b569cf6066..bd0cdad573 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -17,7 +17,8 @@
package spark.scheduler.cluster
-import java.util.{HashMap => JHashMap, NoSuchElementException, Arrays}
+import java.nio.ByteBuffer
+import java.util.{Arrays, NoSuchElementException}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -25,12 +26,14 @@ import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
-import spark._
-import spark.scheduler._
+import spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState, Utils}
+import spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
import spark.TaskState.TaskState
-import java.nio.ByteBuffer
+import spark.scheduler.{ShuffleMapTask, Task, TaskResult, TaskSet}
-private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
+
+private[spark] object TaskLocality
+ extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
// process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
@@ -43,8 +46,10 @@ private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LO
assert (constraint != TaskLocality.PROCESS_LOCAL)
constraint match {
- case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL
- case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
+ case TaskLocality.NODE_LOCAL =>
+ condition == TaskLocality.NODE_LOCAL
+ case TaskLocality.RACK_LOCAL =>
+ condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
// For anything else, allow
case _ => true
}
@@ -56,11 +61,10 @@ private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LO
val retval = TaskLocality.withName(str)
// Must not specify PROCESS_LOCAL !
assert (retval != TaskLocality.PROCESS_LOCAL)
-
retval
} catch {
case nEx: NoSuchElementException => {
- logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL");
+ logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL")
// default to preserve earlier behavior
NODE_LOCAL
}
@@ -71,11 +75,8 @@ private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LO
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
-private[spark] class ClusterTaskSetManager(
- sched: ClusterScheduler,
- val taskSet: TaskSet)
- extends TaskSetManager
- with Logging {
+private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet)
+ extends TaskSetManager with Logging {
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
@@ -84,7 +85,7 @@ private[spark] class ClusterTaskSetManager(
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
// Maximum times a task is allowed to fail before failing the job
- val MAX_TASK_FAILURES = 4
+ val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
@@ -107,13 +108,14 @@ private[spark] class ClusterTaskSetManager(
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
- var name = "TaskSet_"+taskSet.stageId.toString
- var parent:Schedulable = null
+ var name = "TaskSet_" + taskSet.stageId.toString
+ var parent: Schedulable = null
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
- // List of pending tasks for each node (process local to container). These collections are actually
+ // List of pending tasks for each node (process local to container).
+ // These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
@@ -173,9 +175,11 @@ private[spark] class ClusterTaskSetManager(
// Note that it follows the hierarchy.
// if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
// if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
- private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
- taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
-
+ private def findPreferredLocations(
+ _taskPreferredLocations: Seq[String],
+ scheduler: ClusterScheduler,
+ taskLocality: TaskLocality.TaskLocality): HashSet[String] =
+ {
if (TaskLocality.PROCESS_LOCAL == taskLocality) {
// straight forward comparison ! Special case it.
val retval = new HashSet[String]()
@@ -190,13 +194,14 @@ private[spark] class ClusterTaskSetManager(
return retval
}
- val taskPreferredLocations =
+ val taskPreferredLocations = {
if (TaskLocality.NODE_LOCAL == taskLocality) {
_taskPreferredLocations
} else {
assert (TaskLocality.RACK_LOCAL == taskLocality)
// Expand set to include all 'seen' rack local hosts.
- // This works since container allocation/management happens within master - so any rack locality information is updated in msater.
+ // This works since container allocation/management happens within master -
+ // so any rack locality information is updated in msater.
// Best case effort, and maybe sort of kludge for now ... rework it later ?
val hosts = new HashSet[String]
_taskPreferredLocations.foreach(h => {
@@ -214,6 +219,7 @@ private[spark] class ClusterTaskSetManager(
hosts
}
+ }
val retval = new HashSet[String]
scheduler.synchronized {
@@ -230,11 +236,13 @@ private[spark] class ClusterTaskSetManager(
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
- // We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
- // hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
- val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL)
- val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
- val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
+ // We can infer hostLocalLocations from rackLocalLocations by joining it against
+ // tasks(index).preferredLocations (with appropriate hostPort <-> host conversion).
+ // But not doing it for simplicity sake. If this becomes a performance issue, modify it.
+ val locs = tasks(index).preferredLocations
+ val processLocalLocations = findPreferredLocations(locs, sched, TaskLocality.PROCESS_LOCAL)
+ val hostLocalLocations = findPreferredLocations(locs, sched, TaskLocality.NODE_LOCAL)
+ val rackLocalLocations = findPreferredLocations(locs, sched, TaskLocality.RACK_LOCAL)
if (rackLocalLocations.size == 0) {
// Current impl ensures this.
@@ -299,18 +307,24 @@ private[spark] class ClusterTaskSetManager(
}
// Number of pending tasks for a given host Port (which would be process local)
- def numPendingTasksForHostPort(hostPort: String): Int = {
- getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
+ override def numPendingTasksForHostPort(hostPort: String): Int = {
+ getPendingTasksForHostPort(hostPort).count { index =>
+ copiesRunning(index) == 0 && !finished(index)
+ }
}
// Number of pending tasks for a given host (which would be data local)
- def numPendingTasksForHost(hostPort: String): Int = {
- getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
+ override def numPendingTasksForHost(hostPort: String): Int = {
+ getPendingTasksForHost(hostPort).count { index =>
+ copiesRunning(index) == 0 && !finished(index)
+ }
}
// Number of pending rack local tasks for a given host
- def numRackLocalPendingTasksForHost(hostPort: String): Int = {
- getRackLocalPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
+ override def numRackLocalPendingTasksForHost(hostPort: String): Int = {
+ getRackLocalPendingTasksForHost(hostPort).count { index =>
+ copiesRunning(index) == 0 && !finished(index)
+ }
}
@@ -338,12 +352,12 @@ private[spark] class ClusterTaskSetManager(
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
if (speculatableTasks.size > 0) {
- val localTask = speculatableTasks.find {
- index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
- val attemptLocs = taskAttempts(index).map(_.hostPort)
- (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
- }
+ val localTask = speculatableTasks.find { index =>
+ val locations = findPreferredLocations(tasks(index).preferredLocations, sched,
+ TaskLocality.NODE_LOCAL)
+ val attemptLocs = taskAttempts(index).map(_.hostPort)
+ (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
+ }
if (localTask != None) {
speculatableTasks -= localTask.get
@@ -352,11 +366,11 @@ private[spark] class ClusterTaskSetManager(
// check for rack locality
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
- val rackTask = speculatableTasks.find {
- index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
- val attemptLocs = taskAttempts(index).map(_.hostPort)
- locations.contains(hostPort) && !attemptLocs.contains(hostPort)
+ val rackTask = speculatableTasks.find { index =>
+ val locations = findPreferredLocations(tasks(index).preferredLocations, sched,
+ TaskLocality.RACK_LOCAL)
+ val attemptLocs = taskAttempts(index).map(_.hostPort)
+ locations.contains(hostPort) && !attemptLocs.contains(hostPort)
}
if (rackTask != None) {
@@ -368,7 +382,9 @@ private[spark] class ClusterTaskSetManager(
// Any task ...
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
// Check for attemptLocs also ?
- val nonLocalTask = speculatableTasks.find(i => !taskAttempts(i).map(_.hostPort).contains(hostPort))
+ val nonLocalTask = speculatableTasks.find { i =>
+ !taskAttempts(i).map(_.hostPort).contains(hostPort)
+ }
if (nonLocalTask != None) {
speculatableTasks -= nonLocalTask.get
return nonLocalTask
@@ -398,7 +414,8 @@ private[spark] class ClusterTaskSetManager(
}
}
- // Look for no pref tasks AFTER rack local tasks - this has side effect that we will get to failed tasks later rather than sooner.
+ // Look for no pref tasks AFTER rack local tasks - this has side effect that we will get to
+ // failed tasks later rather than sooner.
// TODO: That code path needs to be revisited (adding to no prefs list when host:port goes down).
val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs)
if (noPrefTask != None) {
@@ -434,7 +451,8 @@ private[spark] class ClusterTaskSetManager(
locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
}
- // Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location).
+ // Does a host count as a rack local preferred location for a task?
+ // (assumes host is NOT preferred location).
// This is true if either the task has preferred locations and this host is one, or it has
// no preferred locations (in which we still count the launch as preferred).
private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
@@ -455,14 +473,22 @@ private[spark] class ClusterTaskSetManager(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(execId: String, hostPort: String, availableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
-
+ override def slaveOffer(
+ execId: String,
+ hostPort: String,
+ availableCpus: Double,
+ overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] =
+ {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
// If explicitly specified, use that
val locality = if (overrideLocality != null) overrideLocality else {
// expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
val time = System.currentTimeMillis
- if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY
+ if (time - lastPreferredLaunchTime < LOCALITY_WAIT) {
+ TaskLocality.NODE_LOCAL
+ } else {
+ TaskLocality.ANY
+ }
}
findTask(hostPort, locality) match {
@@ -490,6 +516,8 @@ private[spark] class ClusterTaskSetManager(
}
// Serialize and return the task
val startTime = System.currentTimeMillis
+ // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+ // we assume the task can be serialized without exceptions.
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime
@@ -497,6 +525,8 @@ private[spark] class ClusterTaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
+ if (taskAttempts(index).size == 1)
+ taskStarted(task,info)
return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
@@ -505,7 +535,7 @@ private[spark] class ClusterTaskSetManager(
return None
}
- def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
@@ -520,6 +550,10 @@ private[spark] class ClusterTaskSetManager(
}
}
+ def taskStarted(task: Task[_], info: TaskInfo) {
+ sched.listener.taskStarted(task, info)
+ }
+
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
@@ -532,13 +566,14 @@ private[spark] class ClusterTaskSetManager(
decreaseRunningTasks(1)
if (!finished(index)) {
tasksFinished += 1
- logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
- tid, info.duration, tasksFinished, numTasks))
+ logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
+ tid, info.duration, info.hostPort, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
try {
val result = ser.deserialize[TaskResult[_]](serializedData)
result.metrics.resultSize = serializedData.limit()
- sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+ sched.listener.taskEnded(
+ tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread().getContextClassLoader
@@ -584,8 +619,8 @@ private[spark] class ClusterTaskSetManager(
return
case taskResultTooBig: TaskResultTooBigFailure =>
- logInfo("Loss was due to task %s result exceeding Akka frame size; " +
- "aborting job".format(tid))
+ logInfo("Loss was due to task %s result exceeding Akka frame size; aborting job".format(
+ tid))
abort("Task %s result exceeded Akka frame size".format(tid))
return
@@ -636,7 +671,7 @@ private[spark] class ClusterTaskSetManager(
}
}
- def error(message: String) {
+ override def error(message: String) {
// Save the error message
abort("Error: " + message)
}
@@ -664,7 +699,8 @@ private[spark] class ClusterTaskSetManager(
}
}
- //TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
+ // TODO: for now we just find Pool not TaskSetManager,
+ // we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
@@ -689,13 +725,15 @@ private[spark] class ClusterTaskSetManager(
// If some task has preferred locations only on hostname, and there are no more executors there,
// put it in the no-prefs list to avoid the wait from delay scheduling
- // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
- // no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
- // Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if
- // there is no host local node for the task (not if there is no process local node for the task)
+ // host local tasks - should we push this to rack local or no pref list ? For now, preserving
+ // behavior and moving to no prefs list. Note, this was done due to impliations related to
+ // 'waiting' for data local tasks, etc.
+ // Note: NOT checking process local list - since host local list is super set of that. We need
+ // to ad to no prefs only if there is no host local node for the task (not if there is no
+ // process local node for the task)
for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
- // val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
- val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
+ val newLocs = findPreferredLocations(
+ tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
if (newLocs.isEmpty) {
pendingTasksWithNoPrefs += index
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index ac9e5ef94d..05c29eb72f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -17,46 +17,47 @@
package spark.scheduler.cluster
-import spark.TaskState.TaskState
import java.nio.ByteBuffer
-import spark.util.SerializableBuffer
+
+import spark.TaskState.TaskState
import spark.Utils
+import spark.util.SerializableBuffer
+
private[spark] sealed trait StandaloneClusterMessage extends Serializable
-// Driver to executors
-private[spark]
-case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
+private[spark] object StandaloneClusterMessages {
-private[spark]
-case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
- extends StandaloneClusterMessage
+ // Driver to executors
+ case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
-private[spark]
-case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
+ case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
+ extends StandaloneClusterMessage
-// Executors to driver
-private[spark]
-case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
- extends StandaloneClusterMessage {
- Utils.checkHostPort(hostPort, "Expected host port")
-}
+ case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
-private[spark]
-case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
- extends StandaloneClusterMessage
+ // Executors to driver
+ case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
+ extends StandaloneClusterMessage {
+ Utils.checkHostPort(hostPort, "Expected host port")
+ }
+
+ case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
+ data: SerializableBuffer) extends StandaloneClusterMessage
-private[spark]
-object StatusUpdate {
- /** Alternate factory method that takes a ByteBuffer directly for the data field */
- def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
- StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
+ object StatusUpdate {
+ /** Alternate factory method that takes a ByteBuffer directly for the data field */
+ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer)
+ : StatusUpdate = {
+ StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
+ }
}
-}
-// Internal messages in driver
-private[spark] case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case object StopDriver extends StandaloneClusterMessage
+ // Internal messages in driver
+ case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case class RemoveExecutor(executorId: String, reason: String)
- extends StandaloneClusterMessage
+ case object StopDriver extends StandaloneClusterMessage
+
+ case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
+
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 03a64e0192..075a7cbf7e 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -17,17 +17,18 @@
package spark.scheduler.cluster
+import java.util.concurrent.atomic.AtomicInteger
+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
-import akka.util.duration._
+import akka.dispatch.Await
import akka.pattern.ask
+import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
import akka.util.Duration
import spark.{Utils, SparkException, Logging, TaskState}
-import akka.dispatch.Await
-import java.util.concurrent.atomic.AtomicInteger
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import spark.scheduler.cluster.StandaloneClusterMessages._
/**
* A standalone scheduler backend, which waits for standalone executors to connect to it through
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index a1ebd48b01..c693b722ac 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -51,6 +51,17 @@ class TaskInfo(
def running: Boolean = !finished
+ def status: String = {
+ if (running)
+ "RUNNING"
+ else if (failed)
+ "FAILED"
+ else if (successful)
+ "SUCCESS"
+ else
+ "UNKNOWN"
+ }
+
def duration: Long = {
if (!finished) {
throw new UnsupportedOperationException("duration() called on unfinished tasks")
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 07c3ddcc7e..7978a5df74 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -17,18 +17,28 @@
package spark.scheduler.cluster
-import scala.collection.mutable.ArrayBuffer
-import spark.scheduler._
-import spark.TaskState.TaskState
import java.nio.ByteBuffer
+import spark.TaskState.TaskState
+import spark.scheduler.TaskSet
+
private[spark] trait TaskSetManager extends Schedulable {
+
def taskSet: TaskSet
- def slaveOffer(execId: String, hostPort: String, availableCpus: Double,
- overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription]
+
+ def slaveOffer(
+ execId: String,
+ hostPort: String,
+ availableCpus: Double,
+ overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription]
+
def numPendingTasksForHostPort(hostPort: String): Int
- def numRackLocalPendingTasksForHost(hostPort :String): Int
+
+ def numRackLocalPendingTasksForHost(hostPort: String): Int
+
def numPendingTasksForHost(hostPort: String): Int
+
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
+
def error(message: String)
}
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index c8cb39184d..7ae8f06f6e 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -37,10 +37,15 @@ import akka.actor._
* testing fault recovery.
*/
-private[spark] case class LocalReviveOffers()
-private[spark] case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+private[spark]
+case class LocalReviveOffers()
+
+private[spark]
+case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+
+private[spark]
+class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
-private[spark] class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
def receive = {
case LocalReviveOffers =>
launchTask(localScheduler.resourceOffer(freeCores))
@@ -55,7 +60,7 @@ private[spark] class LocalActor(localScheduler: LocalScheduler, var freeCores: I
freeCores -= 1
localScheduler.threadPool.submit(new Runnable {
def run() {
- localScheduler.runTask(task.taskId,task.serializedTask)
+ localScheduler.runTask(task.taskId, task.serializedTask)
}
})
}
@@ -110,7 +115,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
override def submitTasks(taskSet: TaskSet) {
synchronized {
- var manager = new LocalTaskSetManager(this, taskSet)
+ val manager = new LocalTaskSetManager(this, taskSet)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
activeTaskSets(taskSet.id) = manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
@@ -124,14 +129,15 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val tasks = new ArrayBuffer[TaskDescription](freeCores)
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue) {
- logDebug("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
+ logDebug("parentName:%s,name:%s,runningTasks:%s".format(
+ manager.parent.name, manager.name, manager.runningTasks))
}
var launchTask = false
for (manager <- sortedTaskSetQueue) {
do {
launchTask = false
- manager.slaveOffer(null,null,freeCpuCores) match {
+ manager.slaveOffer(null, null, freeCpuCores) match {
case Some(task) =>
tasks += task
taskIdToTaskSetId(task.taskId) = manager.taskSet.id
@@ -139,7 +145,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
freeCpuCores -= 1
launchTask = true
case None => {}
- }
+ }
} while(launchTask)
}
return tasks
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 5d0402ed46..9837f5cd69 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -17,27 +17,26 @@
package spark.scheduler.local
-import java.io.File
-import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import spark._
+import spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
import spark.TaskState.TaskState
-import spark.scheduler._
-import spark.scheduler.cluster._
+import spark.scheduler.{Task, TaskResult, TaskSet}
+import spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
+
+
+private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
+ extends TaskSetManager with Logging {
-private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging {
var parent: Schedulable = null
var weight: Int = 1
var minShare: Int = 0
var runningTasks: Int = 0
var priority: Int = taskSet.priority
var stageId: Int = taskSet.stageId
- var name: String = "TaskSet_"+taskSet.stageId.toString
-
+ var name: String = "TaskSet_" + taskSet.stageId.toString
var failCount = new Array[Int](taskSet.tasks.size)
val taskInfos = new HashMap[Long, TaskInfo]
@@ -50,49 +49,45 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val numFailures = new Array[Int](numTasks)
val MAX_TASK_FAILURES = sched.maxFailures
- def increaseRunningTasks(taskNum: Int): Unit = {
- runningTasks += taskNum
- if (parent != null) {
- parent.increaseRunningTasks(taskNum)
- }
+ override def increaseRunningTasks(taskNum: Int): Unit = {
+ runningTasks += taskNum
+ if (parent != null) {
+ parent.increaseRunningTasks(taskNum)
+ }
}
- def decreaseRunningTasks(taskNum: Int): Unit = {
+ override def decreaseRunningTasks(taskNum: Int): Unit = {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
- def addSchedulable(schedulable: Schedulable): Unit = {
+ override def addSchedulable(schedulable: Schedulable): Unit = {
//nothing
}
- def removeSchedulable(schedulable: Schedulable): Unit = {
+ override def removeSchedulable(schedulable: Schedulable): Unit = {
//nothing
}
- def getSchedulableByName(name: String): Schedulable = {
+ override def getSchedulableByName(name: String): Schedulable = {
return null
}
- def executorLost(executorId: String, host: String): Unit = {
+ override def executorLost(executorId: String, host: String): Unit = {
//nothing
}
- def checkSpeculatableTasks(): Boolean = {
- return true
- }
+ override def checkSpeculatableTasks() = true
- def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+ override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
sortedTaskSetQueue += this
return sortedTaskSetQueue
}
- def hasPendingTasks(): Boolean = {
- return true
- }
+ override def hasPendingTasks() = true
def findTask(): Option[Int] = {
for (i <- 0 to numTasks-1) {
@@ -103,21 +98,32 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
- def slaveOffer(execId: String, hostPort: String, availableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
+ override def slaveOffer(
+ execId: String,
+ hostPort: String,
+ availableCpus: Double,
+ overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] =
+ {
SparkEnv.set(sched.env)
- logDebug("availableCpus:%d,numFinished:%d,numTasks:%d".format(availableCpus.toInt, numFinished, numTasks))
+ logDebug("availableCpus:%d,numFinished:%d,numTasks:%d".format(
+ availableCpus.toInt, numFinished, numTasks))
if (availableCpus > 0 && numFinished < numTasks) {
findTask() match {
case Some(index) =>
val taskId = sched.attemptId.getAndIncrement()
val task = taskSet.tasks(index)
- val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
+ val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
+ TaskLocality.NODE_LOCAL)
taskInfos(taskId) = info
- val bytes = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+ // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+ // we assume the task can be serialized without exceptions.
+ val bytes = Task.serializeWithDependencies(
+ task, sched.sc.addedFiles, sched.sc.addedJars, ser)
logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")
val taskName = "task %s:%d".format(taskSet.id, index)
copiesRunning(index) += 1
increaseRunningTasks(1)
+ taskStarted(task, info)
return Some(new TaskDescription(taskId, null, taskName, bytes))
case None => {}
}
@@ -125,19 +131,19 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
- def numPendingTasksForHostPort(hostPort: String): Int = {
+ override def numPendingTasksForHostPort(hostPort: String): Int = {
return 0
}
- def numRackLocalPendingTasksForHost(hostPort :String): Int = {
+ override def numRackLocalPendingTasksForHost(hostPort :String): Int = {
return 0
}
- def numPendingTasksForHost(hostPort: String): Int = {
+ override def numPendingTasksForHost(hostPort: String): Int = {
return 0
}
- def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
@@ -148,6 +154,10 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
}
+ def taskStarted(task: Task[_], info: TaskInfo) {
+ sched.listener.taskStarted(task, info)
+ }
+
def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
val index = info.index
@@ -170,15 +180,18 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val task = taskSet.tasks(index)
info.markFailed()
decreaseRunningTasks(1)
- val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader)
+ val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
+ serializedData, getClass.getClassLoader)
sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
if (!finished(index)) {
copiesRunning(index) -= 1
numFailures(index) += 1
val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
- logInfo("Loss was due to %s\n%s\n%s".format(reason.className, reason.description, locs.mkString("\n")))
+ logInfo("Loss was due to %s\n%s\n%s".format(
+ reason.className, reason.description, locs.mkString("\n")))
if (numFailures(index) > MAX_TASK_FAILURES) {
- val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(taskSet.id, index, 4, reason.description)
+ val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
+ taskSet.id, index, 4, reason.description)
decreaseRunningTasks(runningTasks)
sched.listener.taskSetFailed(taskSet, errorMessage)
// need to delete failed Taskset from schedule queue
@@ -187,6 +200,6 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
}
- def error(message: String) {
+ override def error(message: String) {
}
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 7bc6040544..6ebbb5ec9b 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
- val runScript = new File(sparkHome, "run").getCanonicalPath
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
- val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend(
.setValue(value)
.build())
}
- return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"),
+ System.getProperty("spark.driver.port"),
+ StandaloneSchedulerBackend.ACTOR_NAME)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
+ val runScript = new File(sparkHome, "run").getCanonicalPath
+ command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
+ return command.build()
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 75b8268b55..f6069a5775 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend(
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend(
.setValue(value)
.build())
}
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
+ command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue("cd %s*; ./spark-executor".format(basename))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
.build()
- val command = CommandInfo.newBuilder()
- .setValue(execScript)
- .setEnvironment(environment)
- .build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index e4ffa57ad2..3a72474419 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -27,11 +27,10 @@ import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{Logging, SparkEnv, SparkException, Utils}
+import spark.io.CompressionCodec
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
@@ -158,6 +157,13 @@ private[spark] class BlockManager(
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
+ // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
+ // the initialization of the compression codec until it is first used. The reason is that a Spark
+ // program could be using a user-defined codec in a third party jar, which is loaded in
+ // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
+ // loaded yet.
+ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
+
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
@@ -919,18 +925,14 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
- if (shouldCompress(blockId)) {
- (new LZFOutputStream(s)).setFinishBlockOnFlush(true)
- } else {
- s
- }
+ if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}
/**
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
- if (shouldCompress(blockId)) new LZFInputStream(s) else s
+ if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}
def dataSerialize(
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 3186f7c85b..76128e8cff 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -23,6 +23,7 @@ import akka.pattern.ask
import akka.util.Duration
import spark.{Logging, SparkException}
+import spark.storage.BlockManagerMessages._
private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 244000d952..011bb6b83d 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -29,6 +29,8 @@ import akka.util.Duration
import akka.util.duration._
import spark.{Logging, Utils, SparkException}
+import spark.storage.BlockManagerMessages._
+
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 01de4ccb8f..9375a9ca54 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -22,102 +22,89 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import akka.actor.ActorRef
-//////////////////////////////////////////////////////////////////////////////////
-// Messages from the master to slaves.
-//////////////////////////////////////////////////////////////////////////////////
-private[spark]
-sealed trait ToBlockManagerSlave
-
-// Remove a block from the slaves that have it. This can only be used to remove
-// blocks that the master knows about.
-private[spark]
-case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
-
-// Remove all blocks belonging to a specific RDD.
-private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
-
-
-//////////////////////////////////////////////////////////////////////////////////
-// Messages from slaves to the master.
-//////////////////////////////////////////////////////////////////////////////////
-private[spark]
-sealed trait ToBlockManagerMaster
-
-private[spark]
-case class RegisterBlockManager(
- blockManagerId: BlockManagerId,
- maxMemSize: Long,
- sender: ActorRef)
- extends ToBlockManagerMaster
-
-private[spark]
-case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-private[spark]
-class UpdateBlockInfo(
- var blockManagerId: BlockManagerId,
- var blockId: String,
- var storageLevel: StorageLevel,
- var memSize: Long,
- var diskSize: Long)
- extends ToBlockManagerMaster
- with Externalizable {
-
- def this() = this(null, null, null, 0, 0) // For deserialization only
-
- override def writeExternal(out: ObjectOutput) {
- blockManagerId.writeExternal(out)
- out.writeUTF(blockId)
- storageLevel.writeExternal(out)
- out.writeLong(memSize)
- out.writeLong(diskSize)
+private[storage] object BlockManagerMessages {
+ //////////////////////////////////////////////////////////////////////////////////
+ // Messages from the master to slaves.
+ //////////////////////////////////////////////////////////////////////////////////
+ sealed trait ToBlockManagerSlave
+
+ // Remove a block from the slaves that have it. This can only be used to remove
+ // blocks that the master knows about.
+ case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+
+ // Remove all blocks belonging to a specific RDD.
+ case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
+
+
+ //////////////////////////////////////////////////////////////////////////////////
+ // Messages from slaves to the master.
+ //////////////////////////////////////////////////////////////////////////////////
+ sealed trait ToBlockManagerMaster
+
+ case class RegisterBlockManager(
+ blockManagerId: BlockManagerId,
+ maxMemSize: Long,
+ sender: ActorRef)
+ extends ToBlockManagerMaster
+
+ case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+ class UpdateBlockInfo(
+ var blockManagerId: BlockManagerId,
+ var blockId: String,
+ var storageLevel: StorageLevel,
+ var memSize: Long,
+ var diskSize: Long)
+ extends ToBlockManagerMaster
+ with Externalizable {
+
+ def this() = this(null, null, null, 0, 0) // For deserialization only
+
+ override def writeExternal(out: ObjectOutput) {
+ blockManagerId.writeExternal(out)
+ out.writeUTF(blockId)
+ storageLevel.writeExternal(out)
+ out.writeLong(memSize)
+ out.writeLong(diskSize)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ blockManagerId = BlockManagerId(in)
+ blockId = in.readUTF()
+ storageLevel = StorageLevel(in)
+ memSize = in.readLong()
+ diskSize = in.readLong()
+ }
}
- override def readExternal(in: ObjectInput) {
- blockManagerId = BlockManagerId(in)
- blockId = in.readUTF()
- storageLevel = StorageLevel(in)
- memSize = in.readLong()
- diskSize = in.readLong()
+ object UpdateBlockInfo {
+ def apply(blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long): UpdateBlockInfo = {
+ new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ }
+
+ // For pattern-matching
+ def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+ }
}
-}
-private[spark]
-object UpdateBlockInfo {
- def apply(blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long): UpdateBlockInfo = {
- new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
- }
+ case class GetLocations(blockId: String) extends ToBlockManagerMaster
- // For pattern-matching
- def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
- }
-}
+ case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-private[spark]
-case class GetLocations(blockId: String) extends ToBlockManagerMaster
+ case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-private[spark]
-case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+ case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
-private[spark]
-case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+ case object StopBlockManagerMaster extends ToBlockManagerMaster
-private[spark]
-case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
+ case object GetMemoryStatus extends ToBlockManagerMaster
-private[spark]
-case object StopBlockManagerMaster extends ToBlockManagerMaster
+ case object ExpireDeadHosts extends ToBlockManagerMaster
-private[spark]
-case object GetMemoryStatus extends ToBlockManagerMaster
-
-private[spark]
-case object ExpireDeadHosts extends ToBlockManagerMaster
-
-private[spark]
-case object GetStorageStatus extends ToBlockManagerMaster
+ case object GetStorageStatus extends ToBlockManagerMaster
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
index 45cffad810..6e5fb43732 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -19,7 +19,7 @@ package spark.storage
import akka.actor.Actor
-import spark.{Logging, SparkException, Utils}
+import spark.storage.BlockManagerMessages._
/**
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
new file mode 100644
index 0000000000..2aecd1ea71
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala
@@ -0,0 +1,48 @@
+package spark.storage
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import spark.metrics.source.Source
+
+
+private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "BlockManager"
+
+ metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ maxMem / 1024 / 1024
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ remainingMem / 1024 / 1024
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ (maxMem - remainingMem) / 1024 / 1024
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val diskSpaceUsed = storageStatusList
+ .flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_ + _)
+ .getOrElse(0L)
+
+ diskSpaceUsed / 1024 / 1024
+ }
+ })
+}
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index ab72dbb62b..bcce26b7c1 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer
-import spark._
import spark.network._
private[spark] case class GetBlock(id: String)
diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala
index b0229d6124..ee2fc167d5 100644
--- a/core/src/main/scala/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala
@@ -19,7 +19,6 @@ package spark.storage
import java.nio.ByteBuffer
-import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer
import spark._
@@ -113,7 +112,7 @@ private[spark] object BlockMessageArray {
def main(args: Array[String]) {
val blockMessages =
- (0 until 10).map(i => {
+ (0 until 10).map { i =>
if (i % 2 == 0) {
val buffer = ByteBuffer.allocate(100)
buffer.clear
@@ -121,7 +120,7 @@ private[spark] object BlockMessageArray {
} else {
BlockMessage.fromGetBlock(GetBlock(i.toString))
}
- })
+ }
val blockMessageArray = new BlockMessageArray(blockMessages)
println("Block message array created")
diff --git a/core/src/main/scala/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/spark/storage/BlockObjectWriter.scala
index 01ed6e8c1f..3812009ca1 100644
--- a/core/src/main/scala/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/spark/storage/BlockObjectWriter.scala
@@ -17,8 +17,6 @@
package spark.storage
-import java.nio.ByteBuffer
-
/**
* An interface for writing JVM objects to some underlying storage. This interface allows
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3495d653bd..3ebfe173b1 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -66,7 +66,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
objOut.close()
- bs.close()
channel = null
bs = null
objOut = null
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 20ea54d6a6..4be2bfa413 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -5,7 +5,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Properties
import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
@@ -18,7 +18,6 @@ import spark.ui.JettyUtils._
import spark.ui.Page.Executors
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
-import spark.Utils
import scala.xml.{Node, XML}
@@ -45,7 +44,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.reduceOption(_+_).getOrElse(0L)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
def execRow(kv: Seq[String]) =
<tr>
<td>{kv(0)}</td>
@@ -60,6 +59,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(6)}</td>
<td>{kv(7)}</td>
<td>{kv(8)}</td>
+ <td>{kv(9)}</td>
</tr>
val execInfo =
for (b <- 0 until storageStatusList.size)
@@ -93,6 +93,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
+ val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size)
+ .getOrElse(0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
@@ -104,6 +106,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
memUsed,
maxMem,
diskUsed,
+ activeTasks,
failedTasks,
completedTasks,
totalTasks
@@ -111,13 +114,26 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
+ val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val eid = taskStart.taskInfo.executorId
+ val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ activeTasks += taskStart.taskInfo
+ val taskList = executorToTaskInfos.getOrElse(
+ eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ executorToTaskInfos(eid) = taskList
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
+ val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@@ -125,12 +141,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
(Some(e), e.metrics)
case _ =>
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
- (None, Some(taskEnd.taskMetrics))
+ (None, Option(taskEnd.taskMetrics))
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index f31af3cda6..a843b5ea2f 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -21,13 +21,16 @@ import java.util.Date
import javax.servlet.http.HttpServletRequest
+import scala.collection.mutable.HashSet
import scala.Some
import scala.xml.{NodeSeq, Node}
+import spark.scheduler.cluster.TaskInfo
import spark.scheduler.Stage
-import spark.ui.UIUtils._
-import spark.ui.Page._
import spark.storage.StorageLevel
+import spark.ui.Page._
+import spark.ui.UIUtils._
+import spark.Utils
/** Page showing list of all ongoing and recently finished stages */
private[spark] class IndexPage(parent: JobProgressUI) {
@@ -38,6 +41,12 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
+ val now = System.currentTimeMillis()
+
+ var activeTime = 0L
+ for (tasks <- listener.stageToTasksActive.values; t <- tasks) {
+ activeTime += t.timeRunning(now)
+ }
/** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
@@ -48,7 +57,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<th>Submitted</th>
<th>Duration</th>
<th colspan="2">Tasks: Complete/Total</th>
- <th>Shuffle Activity</th>
+ <th>Shuffle Read</th>
+ <th>Shuffle Write</th>
<th>Stored RDD</th>
</thead>
<tbody>
@@ -57,11 +67,33 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</table>
}
+ val summary: NodeSeq =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <strong>CPU time: </strong>
+ {parent.formatDuration(listener.totalTime + activeTime)}
+ </li>
+ {if (listener.totalShuffleRead > 0)
+ <li>
+ <strong>Shuffle read: </strong>
+ {Utils.memoryBytesToString(listener.totalShuffleRead)}
+ </li>
+ }
+ {if (listener.totalShuffleWrite > 0)
+ <li>
+ <strong>Shuffle write: </strong>
+ {Utils.memoryBytesToString(listener.totalShuffleWrite)}
+ </li>
+ }
+ </ul>
+ </div>
val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
val completedStageTable = stageTable(stageRow, completedStages)
val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
- val content = <h2>Active Stages</h2> ++ activeStageTable ++
+ val content = summary ++
+ <h2>Active Stages</h2> ++ activeStageTable ++
<h2>Completed Stages</h2> ++ completedStageTable ++
<h2>Failed Stages</h2> ++ failedStageTable
@@ -75,17 +107,14 @@ private[spark] class IndexPage(parent: JobProgressUI) {
}
}
- def makeProgressBar(completed: Int, total: Int): Seq[Node] = {
- val width=130
- val height=15
- val completeWidth = (completed.toDouble / total) * width
-
- <svg width={width.toString} height={height.toString}>
- <rect width={width.toString} height={height.toString}
- fill="white" stroke="rgb(51,51,51)" stroke-width="1" />
- <rect width={completeWidth.toString} height={height.toString}
- fill="rgb(0,136,204)" stroke="black" stroke-width="1" />
- </svg>
+ def makeProgressBar(started: Int, completed: Int, total: Int): Seq[Node] = {
+ val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
+ val startWidth = "width: %s%%".format((started.toDouble/total)*100)
+
+ <div class="progress" style="height: 15px; margin-bottom: 0px">
+ <div class="bar" style={completeWidth}></div>
+ <div class="bar bar-info" style={startWidth}></div>
+ </div>
}
@@ -94,13 +123,17 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
- val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
- val shuffleInfo = (read, write) match {
- case (true, true) => "Read/Write"
- case (true, false) => "Read"
- case (false, true) => "Write"
- case _ => ""
+
+ val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
+ case 0 => ""
+ case b => Utils.memoryBytesToString(b)
}
+ val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
+ case 0 => ""
+ case b => Utils.memoryBytesToString(b)
+ }
+
+ val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
@@ -110,14 +143,15 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
- <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td>
+ <td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td>
<td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ =>
}}
</td>
- <td>{shuffleInfo}</td>
+ <td>{shuffleRead}</td>
+ <td>{shuffleWrite}</td>
<td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
<a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
{Option(s.rdd.name).getOrElse(s.rdd.id)}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 44dcf82d11..09d24b6302 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -65,6 +65,15 @@ private[spark] class JobProgressListener extends SparkListener {
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
+ // Total metrics reflect metrics only for completed tasks
+ var totalTime = 0L
+ var totalShuffleRead = 0L
+ var totalShuffleWrite = 0L
+
+ val stageToTime = HashMap[Int, Long]()
+ val stageToShuffleRead = HashMap[Int, Long]()
+ val stageToShuffleWrite = HashMap[Int, Long]()
+ val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
@@ -85,6 +94,12 @@ private[spark] class JobProgressListener extends SparkListener {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
stageToTaskInfos.remove(s.id)
+ stageToTime.remove(s.id)
+ stageToShuffleRead.remove(s.id)
+ stageToShuffleWrite.remove(s.id)
+ stageToTasksActive.remove(s.id)
+ stageToTasksComplete.remove(s.id)
+ stageToTasksFailed.remove(s.id)
})
stages.trimEnd(toRemove)
}
@@ -93,8 +108,20 @@ private[spark] class JobProgressListener extends SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
activeStages += stageSubmitted.stage
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val sid = taskStart.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive += taskStart.taskInfo
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ stageToTaskInfos(sid) = taskList
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@@ -102,10 +129,29 @@ private[spark] class JobProgressListener extends SparkListener {
(Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, Some(taskEnd.taskMetrics))
+ (None, Option(taskEnd.taskMetrics))
}
+
+ stageToTime.getOrElseUpdate(sid, 0L)
+ val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+ stageToTime(sid) += time
+ totalTime += time
+
+ stageToShuffleRead.getOrElseUpdate(sid, 0L)
+ val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+ s.remoteBytesRead).getOrElse(0L)
+ stageToShuffleRead(sid) += shuffleRead
+ totalShuffleRead += shuffleRead
+
+ stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+ val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+ s.shuffleBytesWritten).getOrElse(0L)
+ stageToShuffleWrite(sid) += shuffleWrite
+ totalShuffleWrite += shuffleWrite
+
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
@@ -123,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener {
case _ =>
}
}
-
- /** Is this stage's input from a shuffle read. */
- def hasShuffleRead(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
-
- /** Is this stage's output to a shuffle write. */
- def hasShuffleWrite(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 292966f23a..e327cb3947 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -37,23 +37,51 @@ private[spark] class StagePage(parent: JobProgressUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val stageId = request.getParameter("id").toInt
+ val now = System.currentTimeMillis()
if (!listener.stageToTaskInfos.contains(stageId)) {
val content =
<div>
- <h2>Summary Metrics</h2> No tasks have finished yet
- <h2>Tasks</h2> No tasks have finished yet
+ <h2>Summary Metrics</h2> No tasks have started yet
+ <h2>Tasks</h2> No tasks have started yet
</div>
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
val tasks = listener.stageToTaskInfos(stageId)
- val shuffleRead = listener.hasShuffleRead(stageId)
- val shuffleWrite = listener.hasShuffleWrite(stageId)
+ val shuffleRead = listener.stageToShuffleRead(stageId) > 0
+ val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
+
+ var activeTime = 0L
+ listener.stageToTasksActive(stageId).foreach { t =>
+ activeTime += t.timeRunning(now)
+ }
+
+ val summary =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <strong>CPU time: </strong>
+ {parent.formatDuration(listener.stageToTime(stageId) + activeTime)}
+ </li>
+ {if (shuffleRead)
+ <li>
+ <strong>Shuffle read: </strong>
+ {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))}
+ </li>
+ }
+ {if (shuffleWrite)
+ <li>
+ <strong>Shuffle write: </strong>
+ {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))}
+ </li>
+ }
+ </ul>
+ </div>
val taskHeaders: Seq[String] =
- Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
+ Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("Details")
@@ -61,7 +89,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskTable = listingTable(taskHeaders, taskRow, tasks)
// Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => Option(t._2).isDefined)
+ val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (Option(t._2).isDefined))
val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
@@ -98,7 +126,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val content =
- <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable;
+ summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
+ <h2>Tasks</h2> ++ taskTable;
headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
@@ -108,10 +137,17 @@ private[spark] class StagePage(parent: JobProgressUI) {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
+
+ val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
+ else metrics.map(m => m.executorRunTime).getOrElse(1)
+ val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
+ else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+
<tr>
<td>{info.taskId}</td>
- <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}>
- {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
+ <td>{info.status}</td>
+ <td sorttable_customkey={duration.toString}>
+ {formatDuration}
</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
index ed49386f18..a47cac3b96 100644
--- a/core/src/main/scala/spark/util/Vector.scala
+++ b/core/src/main/scala/spark/util/Vector.scala
@@ -73,7 +73,6 @@ class Vector(val elements: Array[Double]) extends Serializable {
def += (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
- var ans = 0.0
var i = 0
while (i < length) {
elements(i) += other(i)
@@ -117,9 +116,7 @@ object Vector {
def apply(elements: Double*) = new Vector(elements.toArray)
def apply(length: Int, initializer: Int => Double): Vector = {
- val elements = new Array[Double](length)
- for (i <- 0 until length)
- elements(i) = initializer(i)
+ val elements: Array[Double] = Array.tabulate(length)(initializer)
return new Vector(elements)
}
diff --git a/core/src/test/resources/test_metrics_config.properties b/core/src/test/resources/test_metrics_config.properties
new file mode 100644
index 0000000000..2b31ddf2eb
--- /dev/null
+++ b/core/src/test/resources/test_metrics_config.properties
@@ -0,0 +1,6 @@
+*.sink.console.period = 10
+*.sink.console.unit = seconds
+*.source.jvm.class = spark.metrics.source.JvmSource
+master.sink.console.period = 20
+master.sink.console.unit = minutes
+
diff --git a/core/src/test/resources/test_metrics_system.properties b/core/src/test/resources/test_metrics_system.properties
new file mode 100644
index 0000000000..d5479f0298
--- /dev/null
+++ b/core/src/test/resources/test_metrics_system.properties
@@ -0,0 +1,7 @@
+*.sink.console.period = 10
+*.sink.console.unit = seconds
+test.sink.console.class = spark.metrics.sink.ConsoleSink
+test.sink.dummy.class = spark.metrics.sink.DummySink
+test.source.dummy.class = spark.metrics.source.DummySource
+test.sink.console.period = 20
+test.sink.console.unit = minutes
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 6c847b8fef..5b133cdd6e 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -18,9 +18,6 @@
package spark
import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-
-import scala.collection.mutable.ArrayBuffer
import SparkContext._
@@ -40,7 +37,7 @@ object FailureSuiteState {
}
class FailureSuite extends FunSuite with LocalSparkContext {
-
+
// Run a 3-task map job in which task 1 deterministically fails once, and check
// whether the job completes successfully and we ran 4 tasks in total.
test("failure in a single-stage job") {
@@ -66,7 +63,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
test("failure in a two-stage job") {
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
- case (k, v) =>
+ case (k, v) =>
FailureSuiteState.synchronized {
FailureSuiteState.tasksRun += 1
if (k == 1 && FailureSuiteState.tasksFailed == 0) {
@@ -87,12 +84,40 @@ class FailureSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
- val thrown = intercept[spark.SparkException] {
+ val thrown = intercept[SparkException] {
results.collect()
}
- assert(thrown.getClass === classOf[spark.SparkException])
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("NotSerializableException"))
+
+ FailureSuiteState.clear()
+ }
+
+ test("failure because task closure is not serializable") {
+ sc = new SparkContext("local[1,1]", "test")
+ val a = new NonSerializable
+
+ // Non-serializable closure in the final result stage
+ val thrown = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).map(x => a).count()
+ }
+ assert(thrown.getClass === classOf[SparkException])
assert(thrown.getMessage.contains("NotSerializableException"))
+ // Non-serializable closure in an earlier stage
+ val thrown1 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count()
+ }
+ assert(thrown1.getClass === classOf[SparkException])
+ assert(thrown1.getMessage.contains("NotSerializableException"))
+
+ // Non-serializable closure in foreach function
+ val thrown2 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).foreach(x => println(a))
+ }
+ assert(thrown2.getClass === classOf[SparkException])
+ assert(thrown2.getMessage.contains("NotSerializableException"))
+
FailureSuiteState.clear()
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 6d9d3129c2..f03d1c822a 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -18,14 +18,11 @@
package spark
import scala.collection.mutable
-import scala.collection.immutable
import org.scalatest.FunSuite
import com.esotericsoftware.kryo._
-import SparkContext._
import spark.test._
-
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("basic types") {
val ser = (new KryoSerializer).newInstance()
@@ -54,6 +51,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
check(Array(true, false, true))
check(Array('a', 'b', 'c'))
check(Array[Int]())
+ check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
}
test("pairs") {
@@ -104,7 +102,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("custom registrator") {
- import spark.test._
+ import KryoTest._
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
val ser = (new KryoSerializer).newInstance()
@@ -124,7 +122,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
val hashMap = new java.util.HashMap[String, String]
hashMap.put("foo", "bar")
check(hashMap)
-
+
System.clearProperty("spark.kryo.registrator")
}
@@ -168,9 +166,9 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
}
-package test {
+object KryoTest {
case class CaseClass(i: Int, s: String) {}
-
+
class ClassWithNoArgConstructor {
var x: Int = 0
override def equals(other: Any) = other match {
@@ -194,4 +192,4 @@ package test {
k.register(classOf[java.util.HashMap[_, _]])
}
}
-}
+} \ No newline at end of file
diff --git a/core/src/test/scala/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/spark/PartitionPruningRDDSuite.scala
new file mode 100644
index 0000000000..88352b639f
--- /dev/null
+++ b/core/src/test/scala/spark/PartitionPruningRDDSuite.scala
@@ -0,0 +1,28 @@
+package spark
+
+import org.scalatest.FunSuite
+import spark.SparkContext._
+import spark.rdd.PartitionPruningRDD
+
+
+class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
+
+ test("Pruned Partitions inherit locality prefs correctly") {
+ class TestPartition(i: Int) extends Partition {
+ def index = i
+ }
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(1),
+ new TestPartition(2),
+ new TestPartition(3))
+ }
+ def compute(split: Partition, context: TaskContext) = {Iterator()}
+ }
+ val prunedRDD = PartitionPruningRDD.create(rdd, {x => if (x==2) true else false})
+ val p = prunedRDD.partitions(0)
+ assert(p.index == 2)
+ assert(prunedRDD.partitions.length == 1)
+ }
+}
diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
new file mode 100644
index 0000000000..1ba82fe2b9
--- /dev/null
+++ b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.io
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import org.scalatest.FunSuite
+
+
+class CompressionCodecSuite extends FunSuite {
+
+ def testCodec(codec: CompressionCodec) {
+ // Write 1000 integers to the output stream, compressed.
+ val outputStream = new ByteArrayOutputStream()
+ val out = codec.compressedOutputStream(outputStream)
+ for (i <- 1 until 1000) {
+ out.write(i % 256)
+ }
+ out.close()
+
+ // Read the 1000 integers back.
+ val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
+ val in = codec.compressedInputStream(inputStream)
+ for (i <- 1 until 1000) {
+ assert(in.read() === i % 256)
+ }
+ in.close()
+ }
+
+ test("default compression codec") {
+ val codec = CompressionCodec.createCodec()
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("lzf compression codec") {
+ val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
+ assert(codec.getClass === classOf[LZFCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("snappy compression codec") {
+ val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+}
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
new file mode 100644
index 0000000000..87cd2ffad2
--- /dev/null
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -0,0 +1,64 @@
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileOutputStream}
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import spark.metrics._
+
+class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
+ var filePath: String = _
+
+ before {
+ filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
+ }
+
+ test("MetricsConfig with default properties") {
+ val conf = new MetricsConfig(Option("dummy-file"))
+ conf.initialize()
+
+ assert(conf.properties.size() === 0)
+ assert(conf.properties.getProperty("test-for-dummy") === null)
+
+ val property = conf.getInstance("random")
+ assert(property.size() === 0)
+ }
+
+ test("MetricsConfig with properties set") {
+ val conf = new MetricsConfig(Option(filePath))
+ conf.initialize()
+
+ val masterProp = conf.getInstance("master")
+ assert(masterProp.size() === 3)
+ assert(masterProp.getProperty("sink.console.period") === "20")
+ assert(masterProp.getProperty("sink.console.unit") === "minutes")
+ assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+
+ val workerProp = conf.getInstance("worker")
+ assert(workerProp.size() === 3)
+ assert(workerProp.getProperty("sink.console.period") === "10")
+ assert(workerProp.getProperty("sink.console.unit") === "seconds")
+ assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ }
+
+ test("MetricsConfig with subProperties") {
+ val conf = new MetricsConfig(Option(filePath))
+ conf.initialize()
+
+ val propCategories = conf.propertyCategories
+ assert(propCategories.size === 2)
+
+ val masterProp = conf.getInstance("master")
+ val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
+ assert(sourceProps.size === 1)
+ assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
+
+ val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
+ assert(sinkProps.size === 1)
+ assert(sinkProps.contains("console"))
+
+ val consoleProps = sinkProps("console")
+ assert(consoleProps.size() === 2)
+ }
+}
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
new file mode 100644
index 0000000000..c189996417
--- /dev/null
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -0,0 +1,39 @@
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileOutputStream}
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import spark.metrics._
+
+class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
+ var filePath: String = _
+
+ before {
+ filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
+ System.setProperty("spark.metrics.conf", filePath)
+ }
+
+ test("MetricsSystem with default config") {
+ val metricsSystem = MetricsSystem.createMetricsSystem("default")
+ val sources = metricsSystem.sources
+ val sinks = metricsSystem.sinks
+
+ assert(sources.length === 0)
+ assert(sinks.length === 0)
+ }
+
+ test("MetricsSystem with sources add") {
+ val metricsSystem = MetricsSystem.createMetricsSystem("test")
+ val sources = metricsSystem.sources
+ val sinks = metricsSystem.sinks
+
+ assert(sources.length === 0)
+ assert(sinks.length === 1)
+
+ val source = new spark.deploy.master.MasterSource(null)
+ metricsSystem.registerSource(source)
+ assert(sources.length === 1)
+ }
+}
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index 8f81d0b6ee..05afcd6567 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -33,7 +33,7 @@ class DummyTaskSetManager(
initNumTasks: Int,
clusterScheduler: ClusterScheduler,
taskSet: TaskSet)
- extends ClusterTaskSetManager(clusterScheduler,taskSet) {
+ extends ClusterTaskSetManager(clusterScheduler, taskSet) {
parent = null
weight = 1
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index f802b66cf1..a8b88d7936 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -23,21 +23,14 @@ import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import spark.LocalSparkContext
-
-import spark.storage.BlockManager
-import spark.storage.BlockManagerId
-import spark.storage.BlockManagerMaster
-import spark.{Dependency, ShuffleDependency, OneToOneDependency}
-import spark.FetchFailedException
import spark.MapOutputTracker
import spark.RDD
import spark.SparkContext
-import spark.SparkException
import spark.Partition
import spark.TaskContext
-import spark.TaskEndReason
-
-import spark.{FetchFailed, Success}
+import spark.{Dependency, ShuffleDependency, OneToOneDependency}
+import spark.{FetchFailed, Success, TaskEndReason}
+import spark.storage.{BlockManagerId, BlockManagerMaster}
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler