aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-20 14:53:40 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-20 14:53:40 -0800
commiteac566a7f4a053fedd677fb067e29d98ce9e6e35 (patch)
tree67a8e79bab238bc542107613dd5a852455c7767f /core
parent2a87d816a24c62215d682e3a7af65489c0d6e708 (diff)
parent5e51b889feac28787bfdb546b04d4b3752f3d8d1 (diff)
downloadspark-eac566a7f4a053fedd677fb067e29d98ce9e6e35.tar.gz
spark-eac566a7f4a053fedd677fb067e29d98ce9e6e35.tar.bz2
spark-eac566a7f4a053fedd677fb067e29d98ce9e6e35.zip
Merge branch 'master' of github.com:mesos/spark into dev
Conflicts: core/src/main/scala/spark/MapOutputTracker.scala core/src/main/scala/spark/PairRDDFunctions.scala core/src/main/scala/spark/ParallelCollection.scala core/src/main/scala/spark/RDD.scala core/src/main/scala/spark/rdd/BlockRDD.scala core/src/main/scala/spark/rdd/CartesianRDD.scala core/src/main/scala/spark/rdd/CoGroupedRDD.scala core/src/main/scala/spark/rdd/CoalescedRDD.scala core/src/main/scala/spark/rdd/FilteredRDD.scala core/src/main/scala/spark/rdd/FlatMappedRDD.scala core/src/main/scala/spark/rdd/GlommedRDD.scala core/src/main/scala/spark/rdd/HadoopRDD.scala core/src/main/scala/spark/rdd/MapPartitionsRDD.scala core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala core/src/main/scala/spark/rdd/MappedRDD.scala core/src/main/scala/spark/rdd/PipedRDD.scala core/src/main/scala/spark/rdd/SampledRDD.scala core/src/main/scala/spark/rdd/ShuffledRDD.scala core/src/main/scala/spark/rdd/UnionRDD.scala core/src/main/scala/spark/storage/BlockManager.scala core/src/main/scala/spark/storage/BlockManagerId.scala core/src/main/scala/spark/storage/BlockManagerMaster.scala core/src/main/scala/spark/storage/StorageLevel.scala core/src/main/scala/spark/util/MetadataCleaner.scala core/src/main/scala/spark/util/TimeStampedHashMap.scala core/src/test/scala/spark/storage/BlockManagerSuite.scala run
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml270
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala29
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala56
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala15
-rw-r--r--core/src/main/scala/spark/ParallelCollection.scala15
-rw-r--r--core/src/main/scala/spark/RDD.scala125
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala13
-rw-r--r--core/src/main/scala/spark/TaskContext.scala19
-rw-r--r--core/src/main/scala/spark/Utils.scala33
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala35
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala2
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala4
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala17
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala8
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala4
-rw-r--r--core/src/main/scala/spark/deploy/client/ClientListener.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/TestClient.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala29
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala13
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerState.scala7
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala7
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala9
-rw-r--r--core/src/main/scala/spark/executor/ExecutorExitCode.scala43
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala23
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala33
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala10
-rw-r--r--core/src/main/scala/spark/rdd/FilteredRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/FlatMappedRDD.scala11
-rw-r--r--core/src/main/scala/spark/rdd/GlommedRDD.scala10
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala20
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala13
-rw-r--r--core/src/main/scala/spark/rdd/MappedRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala34
-rw-r--r--core/src/main/scala/spark/rdd/PipedRDD.scala13
-rw-r--r--core/src/main/scala/spark/rdd/SampledRDD.scala15
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala10
-rw-r--r--core/src/main/scala/spark/rdd/UnionRDD.scala15
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala55
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala23
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala21
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala16
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala202
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala14
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala505
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala401
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala102
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala16
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala7
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala9
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala5
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala28
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala15
-rw-r--r--core/src/main/scala/spark/util/IdGenerator.scala14
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala11
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala2
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_row.scala.html3
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_table.scala.html1
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala2
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java29
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala55
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala12
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala164
71 files changed, 1940 insertions, 850 deletions
diff --git a/core/pom.xml b/core/pom.xml
new file mode 100644
index 0000000000..ae52c20657
--- /dev/null
+++ b/core/pom.xml
@@ -0,0 +1,270 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.spark-project</groupId>
+ <artifactId>parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Core</name>
+ <url>http://spark-project.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>compress-lzf</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>asm</groupId>
+ <artifactId>asm-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>de.javakaffee</groupId>
+ <artifactId>kryo-serializers</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-remote</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>colt</groupId>
+ <artifactId>colt</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>cc.spray</groupId>
+ <artifactId>spray-can</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>cc.spray</groupId>
+ <artifactId>spray-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.tomdz.twirl</groupId>
+ <artifactId>twirl-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.scala-incubator.io</groupId>
+ <artifactId>scala-io-file_${scala.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <exportAntProperties>true</exportAntProperties>
+ <tasks>
+ <property name="spark.classpath" refid="maven.test.classpath"/>
+ <property environment="env"/>
+ <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
+ <condition>
+ <not>
+ <or>
+ <isset property="env.SCALA_HOME"/>
+ <isset property="env.SCALA_LIBRARY_PATH"/>
+ </or>
+ </not>
+ </condition>
+ </fail>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <environmentVariables>
+ <SPARK_HOME>${basedir}/..</SPARK_HOME>
+ <SPARK_TESTING>1</SPARK_TESTING>
+ <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
+ </environmentVariables>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.tomdz.twirl</groupId>
+ <artifactId>twirl-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ <source>src/hadoop1/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-scala-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ <source>src/hadoop2/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-scala-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project> \ No newline at end of file
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index cb54e12257..05428aae1f 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -1,5 +1,9 @@
package spark
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
import akka.actor._
import akka.dispatch._
import akka.pattern.ask
@@ -8,10 +12,6 @@ import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
import spark.storage.BlockManager
import spark.storage.StorageLevel
import util.{TimeStampedHashSet, MetadataCleaner, TimeStampedHashMap}
@@ -44,7 +44,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L)
private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L)
private def getCacheAvailable(host: String): Long = getCacheCapacity(host) - getCacheUsage(host)
-
+
def receive = {
case SlaveCacheStarted(host: String, size: Long) =>
slaveCapacity.put(host, size)
@@ -96,14 +96,14 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager)
extends Logging {
-
+
// Tracker actor on the master, or remote reference to it on workers
val ip: String = System.getProperty("spark.master.host", "localhost")
val port: Int = System.getProperty("spark.master.port", "7077").toInt
val actorName: String = "CacheTracker"
val timeout = 10.seconds
-
+
var trackerActor: ActorRef = if (isMaster) {
val actor = actorSystem.actorOf(Props[CacheTrackerActor], name = actorName)
logInfo("Registered CacheTrackerActor actor")
@@ -140,7 +140,7 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
throw new SparkException("Error reply received from CacheTracker")
}
}
-
+
// Registers an RDD (on master only)
def registerRDD(rddId: Int, numPartitions: Int) {
registeredRddIds.synchronized {
@@ -151,7 +151,7 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
}
}
}
-
+
// For BlockManager.scala only
def cacheLost(host: String) {
communicate(MemoryCacheLost(host))
@@ -163,19 +163,20 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
def getCacheStatus(): Seq[(String, Long, Long)] = {
askTracker(GetCacheStatus).asInstanceOf[Seq[(String, Long, Long)]]
}
-
+
// For BlockManager.scala only
def notifyFromBlockManager(t: AddedToCache) {
communicate(t)
}
-
+
// Get a snapshot of the currently known locations
def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = {
askTracker(GetCacheLocations).asInstanceOf[HashMap[Int, Array[List[String]]]]
}
-
+
// Gets or computes an RDD split
- def getOrCompute[T](rdd: RDD[T], split: Split, storageLevel: StorageLevel): Iterator[T] = {
+ def getOrCompute[T](rdd: RDD[T], split: Split, context: TaskContext, storageLevel: StorageLevel)
+ : Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
logInfo("Cache key is " + key)
blockManager.get(key) match {
@@ -217,7 +218,7 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
val elements = new ArrayBuffer[Any]
- elements ++= rdd.compute(split)
+ elements ++= rdd.compute(split, context)
try {
// Try to put this block in the blockManager
blockManager.put(key, elements, storageLevel, true)
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 20ff5431af..5ebdba0fc8 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -2,6 +2,10 @@ package spark
import java.io._
import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import akka.actor._
import akka.dispatch._
@@ -11,17 +15,14 @@ import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
-import scheduler.MapStatus
+import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import util.{MetadataCleaner, TimeStampedHashMap}
+import spark.util.{MetadataCleaner, TimeStampedHashMap}
+
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
- extends MapOutputTrackerMessage
+ extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
@@ -91,14 +92,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
mapStatuses.put(shuffleId, new Array[MapStatus](numMaps))
}
-
+
def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
var array = mapStatuses(shuffleId)
array.synchronized {
array(mapId) = status
}
}
-
+
def registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
@@ -113,7 +114,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var array = mapStatuses(shuffleId)
if (array != null) {
array.synchronized {
- if (array(mapId).address == bmAddress) {
+ if (array(mapId) != null && array(mapId).address == bmAddress) {
array(mapId) = null
}
}
@@ -122,10 +123,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
}
}
-
+
// Remembers which map output locations are currently being fetched on a worker
val fetching = new HashSet[Int]
-
+
// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = mapStatuses.get(shuffleId).orNull
@@ -150,14 +151,23 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
val host = System.getProperty("spark.hostname", Utils.localHostName)
- val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
- val fetchedStatuses = deserializeStatuses(fetchedBytes)
-
- logInfo("Got the output locations")
- mapStatuses.put(shuffleId, fetchedStatuses)
- fetching.synchronized {
- fetching -= shuffleId
- fetching.notifyAll()
+ // This try-finally prevents hangs due to timeouts:
+ var fetchedStatuses: Array[MapStatus] = null
+ try {
+ val fetchedBytes =
+ askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
+ fetchedStatuses = deserializeStatuses(fetchedBytes)
+ logInfo("Got the output locations")
+ mapStatuses.put(shuffleId, fetchedStatuses)
+ if (fetchedStatuses.contains(null)) {
+ throw new FetchFailedException(null, shuffleId, -1, reduceId,
+ new Exception("Missing an output location for shuffle " + shuffleId))
+ }
+ } finally {
+ fetching.synchronized {
+ fetching -= shuffleId
+ fetching.notifyAll()
+ }
}
return fetchedStatuses.map(s =>
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
@@ -263,8 +273,10 @@ private[spark] object MapOutputTracker {
* sizes up to 35 GB with at most 10% error.
*/
def compressSize(size: Long): Byte = {
- if (size <= 1L) {
+ if (size == 0) {
0
+ } else if (size <= 1L) {
+ 1
} else {
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
}
@@ -275,7 +287,7 @@ private[spark] object MapOutputTracker {
*/
def decompressSize(compressedSize: Byte): Long = {
if (compressedSize == 0) {
- 1
+ 0
} else {
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 1f82bd3ab8..ec6b209932 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -36,11 +36,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
with Serializable {
/**
- * Generic function to combine the elements for each key using a custom set of aggregation
+ * Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
- *
+ *
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
@@ -119,7 +119,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
- /**
+ /**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@@ -225,7 +225,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
}
- /**
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the default
* parallelism level.
*/
@@ -630,7 +630,8 @@ class MappedValuesRDD[K, V, U](prev: WeakReference[RDD[(K, V)]], f: V => U)
override def splits = firstParent[(K, V)].splits
override val partitioner = firstParent[(K, V)].partitioner
- override def compute(split: Split) = firstParent[(K, V)].iterator(split).map{case (k, v) => (k, f(v))}
+ override def compute(split: Split, context: TaskContext) =
+ firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) }
}
private[spark]
@@ -639,8 +640,8 @@ class FlatMappedValuesRDD[K, V, U](prev: WeakReference[RDD[(K, V)]], f: V => Tra
override def splits = firstParent[(K, V)].splits
override val partitioner = firstParent[(K, V)].partitioner
- override def compute(split: Split) = {
- firstParent[(K, V)].iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) }
+ override def compute(split: Split, context: TaskContext) = {
+ firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) }
}
}
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index 4bd9e1bd54..68416a78d0 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -9,8 +9,8 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest](
val slice: Int,
values: Seq[T])
extends Split with Serializable {
-
- def iterator(): Iterator[T] = values.iterator
+
+ def iterator: Iterator[T] = values.iterator
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
@@ -40,8 +40,10 @@ private[spark] class ParallelCollection[T: ClassManifest](
override def splits = splits_.asInstanceOf[Array[Split]]
- override def compute(s: Split) = s.asInstanceOf[ParallelCollectionSplit[T]].iterator
-
+
+ override def compute(s: Split, context: TaskContext) =
+ s.asInstanceOf[ParallelCollectionSplit[T]].iterator
+
override def preferredLocations(s: Split): Seq[String] = {
locationPrefs.get(splits_.indexOf(s)) match {
case Some(s) => s
@@ -50,10 +52,11 @@ private[spark] class ParallelCollection[T: ClassManifest](
}
}
+
private object ParallelCollection {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
- * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
+ * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers.
*/
def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
@@ -63,7 +66,7 @@ private object ParallelCollection {
seq match {
case r: Range.Inclusive => {
val sign = if (r.step < 0) {
- -1
+ -1
} else {
1
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index fbfcfbd704..cf3ed06773 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -2,15 +2,14 @@ package spark
import java.io.{ObjectOutputStream, IOException, EOFException, ObjectInputStream}
import java.net.URL
-import java.util.concurrent.atomic.AtomicLong
-import java.util.Random
-import java.util.Date
+import java.util.{Date, Random}
import java.util.{HashMap => JHashMap}
+import java.util.concurrent.atomic.AtomicLong
-import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
-import scala.collection.mutable.HashMap
import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.BytesWritable
@@ -42,12 +41,13 @@ import spark.rdd.MapPartitionsWithSplitRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
import spark.rdd.UnionRDD
+import spark.rdd.ZippedRDD
import spark.storage.StorageLevel
import SparkContext._
/**
- * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
+ * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such
@@ -75,7 +75,7 @@ import SparkContext._
abstract class RDD[T: ClassManifest](
@transient var sc: SparkContext,
var dependencies_ : List[Dependency[_]]
- ) extends Serializable {
+ ) extends Serializable with Logging {
def this(@transient oneParent: RDD[_]) =
@@ -87,14 +87,14 @@ abstract class RDD[T: ClassManifest](
def splits: Array[Split]
/** Function for computing a given partition. */
- def compute(split: Split): Iterator[T]
+ def compute(split: Split, context: TaskContext): Iterator[T]
/** How this RDD depends on any parent RDDs. */
def dependencies: List[Dependency[_]] = dependencies_
/** Record user function generating this RDD. */
private[spark] val origin = Utils.getSparkCallSite
-
+
/** Optionally overridden by subclasses to specify how they are partitioned. */
val partitioner: Option[Partitioner] = None
@@ -106,38 +106,52 @@ abstract class RDD[T: ClassManifest](
Nil
}
}
-
+
/** The [[spark.SparkContext]] that this RDD was created on. */
def context = sc
private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
-
+
/** A unique ID for this RDD (within its SparkContext). */
val id = sc.newRddId()
-
+
// Variables relating to persistence
private var storageLevel: StorageLevel = StorageLevel.NONE
/** Returns the first parent RDD */
- protected[spark] def firstParent[U: ClassManifest] = {
- dependencies.head.rdd.asInstanceOf[RDD[U]]
- }
+ protected[spark] def firstParent[U: ClassManifest] = dependencies.head.rdd.asInstanceOf[RDD[U]]
/** Returns the `i` th parent RDD */
protected[spark] def parent[U: ClassManifest](i: Int) = dependencies(i).rdd.asInstanceOf[RDD[U]]
- // Variables relating to checkpointing
- protected val isCheckpointable = true // override to set this to false to avoid checkpointing an RDD
+ //////////////////////////////////////////////////////////////////////////////
+ // Checkpointing related variables
+ //////////////////////////////////////////////////////////////////////////////
+
+ // override to set this to false to avoid checkpointing an RDD
+ protected val isCheckpointable = true
+
+ // set to true when an RDD is marked for checkpointing
+ protected var shouldCheckpoint = false
+
+ // set to true when checkpointing is in progress
+ protected var isCheckpointInProgress = false
+
+ // set to true after checkpointing is completed
+ protected[spark] var isCheckpointed = false
- protected var shouldCheckpoint = false // set to true when an RDD is marked for checkpointing
- protected var isCheckpointInProgress = false // set to true when checkpointing is in progress
- protected[spark] var isCheckpointed = false // set to true after checkpointing is completed
+ // set to the checkpoint file after checkpointing is completed
+ protected[spark] var checkpointFile: String = null
- protected[spark] var checkpointFile: String = null // set to the checkpoint file after checkpointing is completed
- protected var checkpointRDD: RDD[T] = null // set to the HadoopRDD of the checkpoint file
- protected var checkpointRDDSplits: Seq[Split] = null // set to the splits of the Hadoop RDD
+ // set to the HadoopRDD of the checkpoint file
+ protected var checkpointRDD: RDD[T] = null
- // Methods available on all RDDs:
+ // set to the splits of the Hadoop RDD
+ protected var checkpointRDDSplits: Seq[Split] = null
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Methods available on all RDDs
+ //////////////////////////////////////////////////////////////////////////////
/**
* Set this RDD's storage level to persist its values across operations after the first time
@@ -155,7 +169,7 @@ abstract class RDD[T: ClassManifest](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
-
+
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
@@ -194,10 +208,10 @@ abstract class RDD[T: ClassManifest](
}
/**
- * Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler after a job
- * using this RDD has completed (therefore the RDD has been materialized and
- * potentially stored in memory). In case this RDD is not marked for checkpointing,
- * doCheckpoint() is called recursively on the parent RDDs.
+ * Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler after
+ * a job using this RDD has completed (therefore the RDD has been materialized and potentially
+ * stored in memory). In case this RDD is not marked for checkpointing, doCheckpoint() is called
+ * recursively on the parent RDDs.
*/
private[spark] def doCheckpoint() {
val startCheckpoint = synchronized {
@@ -220,7 +234,8 @@ abstract class RDD[T: ClassManifest](
rdd.shouldCheckpoint = false
rdd.isCheckpointInProgress = false
rdd.isCheckpointed = true
- println("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " + rdd.checkpointRDD.id + ", " + rdd.checkpointRDD)
+ logInfo("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " +
+ rdd.checkpointRDD.id + ", " + rdd.checkpointRDD)
}
} else {
// Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked
@@ -229,11 +244,11 @@ abstract class RDD[T: ClassManifest](
}
/**
- * Changes the dependencies of this RDD from its original parents to the new [[spark.rdd.HadoopRDD]]
- * (`newRDD`) created from the checkpoint file. This method must ensure that all references
- * to the original parent RDDs must be removed to enable the parent RDDs to be garbage
- * collected. Subclasses of RDD may override this method for implementing their own changing
- * logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea.
+ * Changes the dependencies of this RDD from its original parents to the new
+ * [[spark.rdd.HadoopRDD]] (`newRDD`) created from the checkpoint file. This method must ensure
+ * that all references to the original parent RDDs must be removed to enable the parent RDDs to
+ * be garbage collected. Subclasses of RDD may override this method for implementing their own
+ * changing logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea.
*/
protected def changeDependencies(newRDD: RDD[_]) {
dependencies_ = List(new OneToOneDependency(newRDD))
@@ -244,19 +259,19 @@ abstract class RDD[T: ClassManifest](
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
- final def iterator(split: Split): Iterator[T] = {
+ final def iterator(split: Split, context: TaskContext): Iterator[T] = {
if (isCheckpointed) {
// ASSUMPTION: Checkpoint Hadoop RDD will have same number of splits as original
- checkpointRDD.iterator(checkpointRDDSplits(split.index))
+ checkpointRDD.iterator(checkpointRDDSplits(split.index), context)
} else if (storageLevel != StorageLevel.NONE) {
- SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel)
+ SparkEnv.get.cacheTracker.getOrCompute[T](this, split, context, storageLevel)
} else {
- compute(split)
+ compute(split, context)
}
}
-
+
// Transformations (return a new RDD)
-
+
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
@@ -292,13 +307,13 @@ abstract class RDD[T: ClassManifest](
var multiplier = 3.0
var initialCount = count()
var maxSelected = 0
-
+
if (initialCount > Integer.MAX_VALUE - 1) {
maxSelected = Integer.MAX_VALUE - 1
} else {
maxSelected = initialCount.toInt
}
-
+
if (num > initialCount) {
total = maxSelected
fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
@@ -308,14 +323,14 @@ abstract class RDD[T: ClassManifest](
fraction = math.min(multiplier * (num + 1) / initialCount, 1.0)
total = num
}
-
+
val rand = new Random(seed)
var samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
-
+
while (samples.length < total) {
samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
}
-
+
Utils.randomizeInPlace(samples, rand).take(total)
}
@@ -383,8 +398,18 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] =
- new MapPartitionsWithSplitRDD(this, sc.clean(f))
+ def mapPartitionsWithSplit[U: ClassManifest](
+ f: (Int, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] =
+ new MapPartitionsWithSplitRDD(this, sc.clean(f), preservesPartitioning)
+
+ /**
+ * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
+ * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+ * partitions* and the *same number of elements in each partition* (e.g. one was made through
+ * a map on the other).
+ */
+ def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
// Actions (launch a job to return a value to the user program)
@@ -435,7 +460,7 @@ abstract class RDD[T: ClassManifest](
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+ * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
*/
@@ -536,7 +561,7 @@ abstract class RDD[T: ClassManifest](
val evaluator = new GroupedCountEvaluator[T](splits.size, confidence)
sc.runApproximateJob(this, countPartition, evaluator, timeout)
}
-
+
/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 3ccdbfe10e..70257193cf 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -87,7 +87,7 @@ class SparkContext(
// Set Spark master host and port system properties
if (System.getProperty("spark.master.host") == null) {
- System.setProperty("spark.master.host", Utils.localIpAddress())
+ System.setProperty("spark.master.host", Utils.localIpAddress)
}
if (System.getProperty("spark.master.port") == null) {
System.setProperty("spark.master.port", "0")
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 9f2b0c42c7..41441720a7 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -86,10 +86,13 @@ object SparkEnv extends Logging {
}
val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
-
- val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
- val blockManager = new BlockManager(blockManagerMaster, serializer)
-
+
+ val masterIp: String = System.getProperty("spark.master.host", "localhost")
+ val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+ val blockManagerMaster = new BlockManagerMaster(
+ actorSystem, isMaster, isLocal, masterIp, masterPort)
+ val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
+
val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isMaster)
@@ -104,7 +107,7 @@ object SparkEnv extends Logging {
val shuffleFetcher = instantiateClass[ShuffleFetcher](
"spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
-
+
val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
diff --git a/core/src/main/scala/spark/TaskContext.scala b/core/src/main/scala/spark/TaskContext.scala
index c14377d17b..d2746b26b3 100644
--- a/core/src/main/scala/spark/TaskContext.scala
+++ b/core/src/main/scala/spark/TaskContext.scala
@@ -1,3 +1,20 @@
package spark
-class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Long) extends Serializable
+import scala.collection.mutable.ArrayBuffer
+
+
+class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Long) extends Serializable {
+
+ @transient
+ val onCompleteCallbacks = new ArrayBuffer[() => Unit]
+
+ // Add a callback function to be executed on task completion. An example use
+ // is for HadoopRDD to register a callback to close the input stream.
+ def addOnCompleteCallback(f: () => Unit) {
+ onCompleteCallbacks += f
+ }
+
+ def executeOnCompleteCallbacks() {
+ onCompleteCallbacks.foreach{_()}
+ }
+}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 06fa559fb6..63e17ee4e3 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -1,12 +1,13 @@
package spark
import java.io._
-import java.net.{InetAddress, URL, URI}
+import java.net.{NetworkInterface, InetAddress, URL, URI}
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
import scala.io.Source
/**
@@ -199,12 +200,34 @@ private object Utils extends Logging {
/**
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
*/
- def localIpAddress(): String = {
+ lazy val localIpAddress: String = findLocalIpAddress()
+
+ private def findLocalIpAddress(): String = {
val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
- if (defaultIpOverride != null)
+ if (defaultIpOverride != null) {
defaultIpOverride
- else
- InetAddress.getLocalHost.getHostAddress
+ } else {
+ val address = InetAddress.getLocalHost
+ if (address.isLoopbackAddress) {
+ // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
+ // a better address using the local network interfaces
+ for (ni <- NetworkInterface.getNetworkInterfaces) {
+ for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress && !addr.isLoopbackAddress) {
+ // We've found an address that looks reasonable!
+ logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
+ " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
+ " instead (on interface " + ni.getName + ")")
+ logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
+ return addr.getHostAddress
+ }
+ }
+ logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
+ " a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
+ " external IP address!")
+ logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
+ }
+ address.getHostAddress
+ }
}
private var customHostname: Option[String] = None
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 13fcee1004..81d3a94466 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -1,16 +1,15 @@
package spark.api.java
-import spark.{SparkContext, Split, RDD}
+import java.util.{List => JList}
+import scala.Tuple2
+import scala.collection.JavaConversions._
+
+import spark.{SparkContext, Split, RDD, TaskContext}
import spark.api.java.JavaPairRDD._
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import spark.partial.{PartialResult, BoundedDouble}
import spark.storage.StorageLevel
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-import java.{util, lang}
-import scala.Tuple2
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
@@ -24,7 +23,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/** The [[spark.SparkContext]] that this RDD was created on. */
def context: SparkContext = rdd.context
-
+
/** A unique ID for this RDD (within its SparkContext). */
def id: Int = rdd.id
@@ -36,7 +35,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
- def iterator(split: Split): java.util.Iterator[T] = asJavaIterator(rdd.iterator(split))
+ def iterator(split: Split, taskContext: TaskContext): java.util.Iterator[T] =
+ asJavaIterator(rdd.iterator(split, taskContext))
// Transformations (return a new RDD)
@@ -99,7 +99,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType())
}
-
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
@@ -172,8 +171,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
+ /**
+ * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
+ * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+ * partitions* and the *same number of elements in each partition* (e.g. one was made through
+ * a map on the other).
+ */
+ def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
+ JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest)
+ }
+
// Actions (launch a job to return a value to the user program)
-
+
/**
* Applies a function f to all elements of this RDD.
*/
@@ -190,7 +199,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
val arr: java.util.Collection[T] = rdd.collect().toSeq
new java.util.ArrayList(arr)
}
-
+
/**
* Reduces the elements of this RDD using the specified associative binary operator.
*/
@@ -198,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+ * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
*/
@@ -241,7 +250,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): java.util.Map[T, java.lang.Long] =
- mapAsJavaMap(rdd.countByValue().map((x => (x._1, new lang.Long(x._2)))))
+ mapAsJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2)))))
/**
* (Experimental) Approximate version of countByValue().
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index ef27bbb502..386f505f2a 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -48,7 +48,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
// Used only in Workers
@transient var ttGuide: TalkToGuide = null
- @transient var hostAddress = Utils.localIpAddress()
+ @transient var hostAddress = Utils.localIpAddress
@transient var listenPort = -1
@transient var guidePort = -1
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index fa676e9064..f573512835 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -36,7 +36,7 @@ extends Broadcast[T](id) with Logging with Serializable {
@transient var serveMR: ServeMultipleRequests = null
@transient var guideMR: GuideMultipleRequests = null
- @transient var hostAddress = Utils.localIpAddress()
+ @transient var hostAddress = Utils.localIpAddress
@transient var listenPort = -1
@transient var guidePort = -1
@@ -138,7 +138,7 @@ extends Broadcast[T](id) with Logging with Serializable {
serveMR = null
- hostAddress = Utils.localIpAddress()
+ hostAddress = Utils.localIpAddress
listenPort = -1
stopBroadcast = false
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 7a1089c816..457122745b 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -11,8 +11,15 @@ private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master
-private[spark]
-case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
+private[spark]
+case class RegisterWorker(
+ id: String,
+ host: String,
+ port: Int,
+ cores: Int,
+ memory: Int,
+ webUiPort: Int,
+ publicAddress: String)
extends DeployMessage
private[spark]
@@ -20,7 +27,8 @@ case class ExecutorStateChanged(
jobId: String,
execId: Int,
state: ExecutorState,
- message: Option[String])
+ message: Option[String],
+ exitStatus: Option[Int])
extends DeployMessage
// Master to Worker
@@ -51,7 +59,8 @@ private[spark]
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
private[spark]
-case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
+case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
+ exitStatus: Option[Int])
private[spark]
case class JobKilled(message: String)
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 8b2a71add5..4211d80596 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -35,11 +35,15 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
/* Start the Slaves */
for (slaveNum <- 1 to numSlaves) {
+ /* We can pretend to test distributed stuff by giving the slaves distinct hostnames.
+ All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is
+ sufficiently distinctive. */
+ val slaveIpAddress = "127.100.0." + (slaveNum % 256)
val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
+ AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0)
slaveActorSystems += actorSystem
val actor = actorSystem.actorOf(
- Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
+ Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
name = "Worker")
slaveActors += actor
}
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index c57a1d33e9..90fe9508cd 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -66,12 +66,12 @@ private[spark] class Client(
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
listener.executorAdded(fullId, workerId, host, cores, memory)
- case ExecutorUpdated(id, state, message) =>
+ case ExecutorUpdated(id, state, message, exitStatus) =>
val fullId = jobId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
- listener.executorRemoved(fullId, message.getOrElse(""))
+ listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
}
case Terminated(actor_) if actor_ == master =>
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
index a8fa982085..da6abcc9c2 100644
--- a/core/src/main/scala/spark/deploy/client/ClientListener.scala
+++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala
@@ -14,5 +14,5 @@ private[spark] trait ClientListener {
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit
- def executorRemoved(id: String, message: String): Unit
+ def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit
}
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
index bf0e7428ba..57a7e123b7 100644
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/spark/deploy/client/TestClient.scala
@@ -18,12 +18,12 @@ private[spark] object TestClient {
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {}
- def executorRemoved(id: String, message: String) {}
+ def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
}
def main(args: Array[String]) {
val url = args(0)
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress(), 0)
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
val desc = new JobDescription(
"TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()))
val listener = new TestListener
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 7e5cd6b171..6ecebe626a 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
+ val masterPublicAddress = {
+ val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ if (envVar != null) envVar else ip
+ }
+
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each job
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
@@ -55,15 +60,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
override def receive = {
- case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort) => {
+ case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- addWorker(id, host, workerPort, cores, memory, worker_webUiPort)
+ addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + ip + ":" + webUiPort)
+ sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort)
schedule()
}
}
@@ -78,12 +83,12 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
schedule()
}
- case ExecutorStateChanged(jobId, execId, state, message) => {
+ case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => {
val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId))
execOption match {
case Some(exec) => {
exec.state = state
- exec.job.actor ! ExecutorUpdated(execId, state, message)
+ exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
@@ -151,7 +156,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
if (spreadOutJobs) {
// Try to spread out each job among all the nodes, until it has all its cores
for (job <- waitingJobs if job.coresLeft > 0) {
- val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
+ val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
+ .filter(canUse(job, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -196,8 +202,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
- def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int): WorkerInfo = {
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort)
+ def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
+ publicAddress: String): WorkerInfo = {
+ // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them.
+ workers.filter(w => (w.host == host) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
@@ -207,12 +216,12 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
- workers -= worker
+ worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
- exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None)
+ exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
exec.job.executors -= exec.id
}
}
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 706b1453aa..5a7f5fef8a 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -10,10 +10,11 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val actor: ActorRef,
- val webUiPort: Int) {
+ val webUiPort: Int,
+ val publicAddress: String) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
-
+ var state: WorkerState.Value = WorkerState.ALIVE
var coresUsed = 0
var memoryUsed = 0
@@ -37,8 +38,12 @@ private[spark] class WorkerInfo(
def hasExecutor(job: JobInfo): Boolean = {
executors.values.exists(_.job == job)
}
-
+
def webUiAddress : String = {
- "http://" + this.host + ":" + this.webUiPort
+ "http://" + this.publicAddress + ":" + this.webUiPort
+ }
+
+ def setState(state: WorkerState.Value) = {
+ this.state = state
}
}
diff --git a/core/src/main/scala/spark/deploy/master/WorkerState.scala b/core/src/main/scala/spark/deploy/master/WorkerState.scala
new file mode 100644
index 0000000000..0bf35014c8
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/WorkerState.scala
@@ -0,0 +1,7 @@
+package spark.deploy.master
+
+private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
+ type WorkerState = Value
+
+ val ALIVE, DEAD, DECOMMISSIONED = Value
+}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 07ae7bca78..beceb55ecd 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -60,7 +60,7 @@ private[spark] class ExecutorRunner(
process.destroy()
process.waitFor()
}
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None)
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@@ -134,7 +134,8 @@ private[spark] class ExecutorRunner(
// times on the same machine.
val exitCode = process.waitFor()
val message = "Command exited with code " + exitCode
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message),
+ Some(exitCode))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
@@ -145,7 +146,7 @@ private[spark] class ExecutorRunner(
process.destroy()
}
val message = e.getClass + ": " + e.getMessage
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), None)
}
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 67d41dda29..7c9e588ea2 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -36,6 +36,10 @@ private[spark] class Worker(
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
+ val publicAddress = {
+ val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ if (envVar != null) envVar else ip
+ }
var coresUsed = 0
var memoryUsed = 0
@@ -79,7 +83,7 @@ private[spark] class Worker(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
- master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort)
+ master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
@@ -123,10 +127,10 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
- master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None)
+ master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None, None)
- case ExecutorStateChanged(jobId, execId, state, message) =>
- master ! ExecutorStateChanged(jobId, execId, state, message)
+ case ExecutorStateChanged(jobId, execId, state, message, exitStatus) =>
+ master ! ExecutorStateChanged(jobId, execId, state, message, exitStatus)
val fullId = jobId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index cb29a6b8b4..2552958d27 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -50,9 +50,14 @@ private[spark] class Executor extends Logging {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
- System.exit(1)
+ if (exception.isInstanceOf[OutOfMemoryError]) {
+ System.exit(ExecutorExitCode.OOM)
+ } else {
+ System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+ }
} catch {
- case t: Throwable => System.exit(2)
+ case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM)
+ case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
}
diff --git a/core/src/main/scala/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/spark/executor/ExecutorExitCode.scala
new file mode 100644
index 0000000000..fd76029cb3
--- /dev/null
+++ b/core/src/main/scala/spark/executor/ExecutorExitCode.scala
@@ -0,0 +1,43 @@
+package spark.executor
+
+/**
+ * These are exit codes that executors should use to provide the master with information about
+ * executor failures assuming that cluster management framework can capture the exit codes (but
+ * perhaps not log files). The exit code constants here are chosen to be unlikely to conflict
+ * with "natural" exit statuses that may be caused by the JVM or user code. In particular,
+ * exit codes 128+ arise on some Unix-likes as a result of signals, and it appears that the
+ * OpenJDK JVM may use exit code 1 in some of its own "last chance" code.
+ */
+private[spark]
+object ExecutorExitCode {
+ /** The default uncaught exception handler was reached. */
+ val UNCAUGHT_EXCEPTION = 50
+
+ /** The default uncaught exception handler was called and an exception was encountered while
+ logging the exception. */
+ val UNCAUGHT_EXCEPTION_TWICE = 51
+
+ /** The default uncaught exception handler was reached, and the uncaught exception was an
+ OutOfMemoryError. */
+ val OOM = 52
+
+ /** DiskStore failed to create a local temporary directory after many attempts. */
+ val DISK_STORE_FAILED_TO_CREATE_DIR = 53
+
+ def explainExitCode(exitCode: Int): String = {
+ exitCode match {
+ case UNCAUGHT_EXCEPTION => "Uncaught exception"
+ case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed"
+ case OOM => "OutOfMemoryError"
+ case DISK_STORE_FAILED_TO_CREATE_DIR =>
+ "Failed to create local directory (bad spark.local.dir?)"
+ case _ =>
+ "Unknown executor exit code (" + exitCode + ")" + (
+ if (exitCode > 128)
+ " (died from signal " + (exitCode - 128) + "?)"
+ else
+ ""
+ )
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index f4c3f99011..61bc5c90ba 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -2,11 +2,8 @@ package spark.rdd
import scala.collection.mutable.HashMap
-import spark.Dependency
-import spark.RDD
-import spark.SparkContext
-import spark.SparkEnv
-import spark.Split
+import spark.{RDD, SparkContext, SparkEnv, Split, TaskContext}
+
private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
val index = idx
@@ -19,24 +16,24 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
@transient
val splits_ = (0 until blockIds.size).map(i => {
new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
- }).toArray
-
- @transient
+ }).toArray
+
+ @transient
lazy val locations_ = {
- val blockManager = SparkEnv.get.blockManager
+ val blockManager = SparkEnv.get.blockManager
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
- val locations = blockManager.getLocations(blockIds)
+ val locations = blockManager.getLocations(blockIds)
HashMap(blockIds.zip(locations):_*)
}
override def splits = splits_
- override def compute(split: Split): Iterator[T] = {
- val blockManager = SparkEnv.get.blockManager
+ override def compute(split: Split, context: TaskContext): Iterator[T] = {
+ val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDSplit].blockId
blockManager.get(blockId) match {
case Some(block) => block.asInstanceOf[Iterator[T]]
- case None =>
+ case None =>
throw new Exception("Could not compute split, block " + blockId + " not found")
}
}
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 458ad38d55..bc11b60e05 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -1,8 +1,10 @@
package spark.rdd
-import spark._
import java.lang.ref.WeakReference
+import spark.{OneToOneDependency, NarrowDependency, RDD, SparkContext, Split, TaskContext}
+
+
private[spark]
class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
override val index: Int = idx
@@ -40,9 +42,10 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
}
}
- override def compute(split: Split) = {
+ override def compute(split: Split, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianSplit]
- for (x <- rdd1.iterator(currSplit.s1); y <- rdd2.iterator(currSplit.s2)) yield (x, y)
+ for (x <- rdd1.iterator(currSplit.s1, context);
+ y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
var deps_ = List(
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 94ef1b56e8..ef8673909b 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -1,21 +1,18 @@
package spark.rdd
+import java.io.{ObjectOutputStream, IOException}
+
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import spark.Aggregator
-import spark.Dependency
-import spark.Logging
-import spark.OneToOneDependency
-import spark.Partitioner
-import spark.RDD
-import spark.ShuffleDependency
-import spark.SparkEnv
-import spark.Split
-import java.io.{ObjectOutputStream, IOException}
+import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Split, TaskContext}
+import spark.{Dependency, OneToOneDependency, ShuffleDependency}
+
private[spark] sealed trait CoGroupSplitDep extends Serializable
-private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], splitIndex: Int, var split: Split = null)
+
+private[spark]
+case class NarrowCoGroupSplitDep(rdd: RDD[_], splitIndex: Int, var split: Split = null)
extends CoGroupSplitDep {
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
@@ -26,9 +23,10 @@ private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], splitIndex: Int, va
}
}
}
+
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-private[spark]
+private[spark]
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
@@ -41,9 +39,10 @@ private[spark] class CoGroupAggregator
{ (b1, b2) => b1 ++ b2 })
with Serializable
+
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging {
-
+
val aggr = new CoGroupAggregator
@transient
@@ -81,10 +80,10 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
override def splits = splits_
-
+
override val partitioner = Some(part)
-
- override def compute(s: Split): Iterator[(K, Seq[Seq[_]])] = {
+
+ override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
val split = s.asInstanceOf[CoGroupSplit]
val numRdds = split.deps.size
val map = new HashMap[K, Seq[ArrayBuffer[Any]]]
@@ -94,7 +93,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => {
// Read them from the parent
- for ((k, v) <- rdd.iterator(itsSplit)) {
+ for ((k, v) <- rdd.iterator(itsSplit, context)) {
getSeq(k.asInstanceOf[K])(depNum) += v
}
}
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 5b5f72ddeb..c5e2300d26 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -1,8 +1,10 @@
package spark.rdd
-import spark._
import java.lang.ref.WeakReference
+import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Split, TaskContext}
+
+
private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split
/**
@@ -33,9 +35,9 @@ class CoalescedRDD[T: ClassManifest](
override def splits = splits_
- override def compute(split: Split): Iterator[T] = {
- split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap {
- parentSplit => firstParent[T].iterator(parentSplit)
+ override def compute(split: Split, context: TaskContext): Iterator[T] = {
+ split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap { parentSplit =>
+ firstParent[T].iterator(parentSplit, context)
}
}
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
index 1370cf6faf..70c4be7903 100644
--- a/core/src/main/scala/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala
@@ -1,16 +1,14 @@
package spark.rdd
-import spark.OneToOneDependency
-import spark.RDD
-import spark.Split
import java.lang.ref.WeakReference
+import spark.{OneToOneDependency, RDD, Split, TaskContext}
+
private[spark]
-class FilteredRDD[T: ClassManifest](
- prev: WeakReference[RDD[T]],
- f: T => Boolean)
+class FilteredRDD[T: ClassManifest](prev: WeakReference[RDD[T]], f: T => Boolean)
extends RDD[T](prev.get) {
override def splits = firstParent[T].splits
- override def compute(split: Split) = firstParent[T].iterator(split).filter(f)
+ override def compute(split: Split, context: TaskContext) =
+ firstParent[T].iterator(split, context).filter(f)
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
index 6b2cc67568..1ebbb4c9bd 100644
--- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
@@ -1,16 +1,17 @@
package spark.rdd
-import spark.OneToOneDependency
-import spark.RDD
-import spark.Split
import java.lang.ref.WeakReference
+import spark.{RDD, Split, TaskContext}
+
+
private[spark]
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: WeakReference[RDD[T]],
f: T => TraversableOnce[U])
extends RDD[U](prev.get) {
-
+
override def splits = firstParent[T].splits
- override def compute(split: Split) = firstParent[T].iterator(split).flatMap(f)
+ override def compute(split: Split, context: TaskContext) =
+ firstParent[T].iterator(split, context).flatMap(f)
}
diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala
index 0f0b6ab0ff..43661ae3f8 100644
--- a/core/src/main/scala/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala
@@ -1,13 +1,13 @@
package spark.rdd
-import spark.OneToOneDependency
-import spark.RDD
-import spark.Split
import java.lang.ref.WeakReference
+import spark.{RDD, Split, TaskContext}
+
private[spark]
class GlommedRDD[T: ClassManifest](prev: WeakReference[RDD[T]])
extends RDD[Array[T]](prev.get) {
override def splits = firstParent[T].splits
- override def compute(split: Split) = Array(firstParent[T].iterator(split).toArray).iterator
-} \ No newline at end of file
+ override def compute(split: Split, context: TaskContext) =
+ Array(firstParent[T].iterator(split, context).toArray).iterator
+}
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 19ed56d9c0..7b5f8ac3e9 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,19 +15,16 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import spark.Dependency
-import spark.RDD
-import spark.SerializableWritable
-import spark.SparkContext
-import spark.Split
+import spark.{Dependency, RDD, SerializableWritable, SparkContext, Split, TaskContext}
-/**
+
+/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
private[spark] class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split
with Serializable {
-
+
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
@@ -47,10 +44,10 @@ class HadoopRDD[K, V](
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) {
-
+
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
-
+
@transient
val splits_ : Array[Split] = {
val inputFormat = createInputFormat(conf)
@@ -69,7 +66,7 @@ class HadoopRDD[K, V](
override def splits = splits_
- override def compute(theSplit: Split) = new Iterator[(K, V)] {
+ override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
@@ -77,6 +74,9 @@ class HadoopRDD[K, V](
val fmt = createInputFormat(conf)
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+ // Register an on-task-completion callback to close the input stream.
+ context.addOnCompleteCallback(() => reader.close())
+
val key: K = reader.createKey()
val value: V = reader.createValue()
var gotNext = false
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
index b04f56cfcc..991f4be73f 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -1,10 +1,14 @@
package spark.rdd
+
import spark.OneToOneDependency
import spark.RDD
import spark.Split
import java.lang.ref.WeakReference
+import spark.{RDD, Split, TaskContext}
+
+
private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: WeakReference[RDD[T]],
@@ -13,7 +17,8 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
extends RDD[U](prev.get) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
-
+
override def splits = firstParent[T].splits
- override def compute(split: Split) = f(firstParent[T].iterator(split))
+ override def compute(split: Split, context: TaskContext) =
+ f(firstParent[T].iterator(split, context))
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
index 7a4b6ffb03..e2e7753cde 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
@@ -1,10 +1,10 @@
package spark.rdd
-import spark.OneToOneDependency
-import spark.RDD
-import spark.Split
import java.lang.ref.WeakReference
+import spark.{RDD, Split, TaskContext}
+
+
/**
* A variant of the MapPartitionsRDD that passes the split index into the
* closure. This can be used to generate or collect partition specific
@@ -13,9 +13,12 @@ import java.lang.ref.WeakReference
private[spark]
class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
prev: WeakReference[RDD[T]],
- f: (Int, Iterator[T]) => Iterator[U])
+ f: (Int, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean)
extends RDD[U](prev.get) {
+ override val partitioner = if (preservesPartitioning) prev.get.partitioner else None
override def splits = firstParent[T].splits
- override def compute(split: Split) = f(split.index, firstParent[T].iterator(split))
+ override def compute(split: Split, context: TaskContext) =
+ f(split.index, firstParent[T].iterator(split, context))
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
index 8fa1872e0a..986cf35291 100644
--- a/core/src/main/scala/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/MappedRDD.scala
@@ -1,10 +1,10 @@
package spark.rdd
-import spark.OneToOneDependency
-import spark.RDD
-import spark.Split
import java.lang.ref.WeakReference
+import spark.{RDD, Split, TaskContext}
+
+
private[spark]
class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: WeakReference[RDD[T]],
@@ -12,5 +12,6 @@ class MappedRDD[U: ClassManifest, T: ClassManifest](
extends RDD[U](prev.get) {
override def splits = firstParent[T].splits
- override def compute(split: Split) = firstParent[T].iterator(split).map(f)
+ override def compute(split: Split, context: TaskContext) =
+ firstParent[T].iterator(split, context).map(f)
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 2875abb2db..c7cc8d4685 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -1,22 +1,19 @@
package spark.rdd
+import java.text.SimpleDateFormat
+import java.util.Date
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
-import java.util.Date
-import java.text.SimpleDateFormat
+import spark.{Dependency, RDD, SerializableWritable, SparkContext, Split, TaskContext}
-import spark.Dependency
-import spark.RDD
-import spark.SerializableWritable
-import spark.SparkContext
-import spark.Split
-private[spark]
+private[spark]
class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
extends Split {
-
+
val serializableHadoopSplit = new SerializableWritable(rawSplit)
override def hashCode(): Int = (41 * (41 + rddId) + index)
@@ -30,7 +27,7 @@ class NewHadoopRDD[K, V](
@transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with HadoopMapReduceUtil {
-
+
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
@@ -57,15 +54,19 @@ class NewHadoopRDD[K, V](
override def splits = splits_
- override def compute(theSplit: Split) = new Iterator[(K, V)] {
+ override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopSplit]
val conf = confBroadcast.value.value
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
- val context = newTaskAttemptContext(conf, attemptId)
+ val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
- val reader = format.createRecordReader(split.serializableHadoopSplit.value, context)
- reader.initialize(split.serializableHadoopSplit.value, context)
-
+ val reader = format.createRecordReader(
+ split.serializableHadoopSplit.value, hadoopAttemptContext)
+ reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+
+ // Register an on-task-completion callback to close the input stream.
+ context.addOnCompleteCallback(() => reader.close())
+
var havePair = false
var finished = false
@@ -73,9 +74,6 @@ class NewHadoopRDD[K, V](
if (!finished && !havePair) {
finished = !reader.nextKeyValue
havePair = !finished
- if (finished) {
- reader.close()
- }
}
!finished
}
diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala
index d9293a9d1a..076c6a64a0 100644
--- a/core/src/main/scala/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/spark/rdd/PipedRDD.scala
@@ -1,6 +1,7 @@
package spark.rdd
import java.io.PrintWriter
+import java.lang.ref.WeakReference
import java.util.StringTokenizer
import scala.collection.Map
@@ -8,11 +9,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
-import spark.OneToOneDependency
-import spark.RDD
-import spark.SparkEnv
-import spark.Split
-import java.lang.ref.WeakReference
+import spark.{RDD, SparkEnv, Split, TaskContext}
/**
@@ -33,12 +30,12 @@ class PipedRDD[T: ClassManifest](
override def splits = firstParent[T].splits
- override def compute(split: Split): Iterator[String] = {
+ override def compute(split: Split, context: TaskContext): Iterator[String] = {
val pb = new ProcessBuilder(command)
// Add the environmental variables to the process.
val currentEnvVars = pb.environment()
envVars.foreach { case (variable, value) => currentEnvVars.put(variable, value) }
-
+
val proc = pb.start()
val env = SparkEnv.get
@@ -56,7 +53,7 @@ class PipedRDD[T: ClassManifest](
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
- for (elem <- firstParent[T].iterator(split)) {
+ for (elem <- firstParent[T].iterator(split, context)) {
out.println(elem)
}
out.close()
diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala
index f273f257f8..0dc83c127f 100644
--- a/core/src/main/scala/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/spark/rdd/SampledRDD.scala
@@ -1,13 +1,12 @@
package spark.rdd
+import java.lang.ref.WeakReference
import java.util.Random
+
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
-import spark.RDD
-import spark.OneToOneDependency
-import spark.Split
-import java.lang.ref.WeakReference
+import spark.{RDD, Split, TaskContext}
private[spark]
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
@@ -16,7 +15,7 @@ class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Seriali
class SampledRDD[T: ClassManifest](
prev: WeakReference[RDD[T]],
- withReplacement: Boolean,
+ withReplacement: Boolean,
frac: Double,
seed: Int)
extends RDD[T](prev.get) {
@@ -32,13 +31,13 @@ class SampledRDD[T: ClassManifest](
override def preferredLocations(split: Split) =
firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
- override def compute(splitIn: Split) = {
+ override def compute(splitIn: Split, context: TaskContext) = {
val split = splitIn.asInstanceOf[SampledRDDSplit]
if (withReplacement) {
// For large datasets, the expected number of occurrences of each element in a sample with
// replacement is Poisson(frac). We use that to get a count for each element.
val poisson = new Poisson(frac, new DRand(split.seed))
- firstParent[T].iterator(split.prev).flatMap { element =>
+ firstParent[T].iterator(split.prev, context).flatMap { element =>
val count = poisson.nextInt()
if (count == 0) {
Iterator.empty // Avoid object allocation when we return 0 items, which is quite often
@@ -48,7 +47,7 @@ class SampledRDD[T: ClassManifest](
}
} else { // Sampling without replacement
val rand = new Random(split.seed)
- firstParent[T].iterator(split.prev).filter(x => (rand.nextDouble <= frac))
+ firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac))
}
}
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 31774585f4..7d592ffc5e 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -1,12 +1,10 @@
package spark.rdd
-import spark.Partitioner
-import spark.RDD
-import spark.ShuffleDependency
-import spark.SparkEnv
-import spark.Split
import java.lang.ref.WeakReference
+import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Split, TaskContext}
+
+
private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
override def hashCode(): Int = idx
@@ -31,7 +29,7 @@ class ShuffledRDD[K, V](
override def splits = splits_
- override def compute(split: Split): Iterator[(K, V)] = {
+ override def compute(split: Split, context: TaskContext): Iterator[(K, V)] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index)
}
diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index 643a174160..21965d3f5d 100644
--- a/core/src/main/scala/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -1,18 +1,18 @@
package spark.rdd
+import java.lang.ref.WeakReference
import scala.collection.mutable.ArrayBuffer
+import spark.{Dependency, OneToOneDependency, RangeDependency, RDD, SparkContext, Split, TaskContext}
-import spark._
-import java.lang.ref.WeakReference
private[spark] class UnionSplit[T: ClassManifest](
- idx: Int,
+ idx: Int,
rdd: RDD[T],
split: Split)
extends Split
with Serializable {
-
- def iterator() = rdd.iterator(split)
+
+ def iterator(context: TaskContext) = rdd.iterator(split, context)
def preferredLocations() = rdd.preferredLocations(split)
override val index: Int = idx
}
@@ -20,7 +20,7 @@ private[spark] class UnionSplit[T: ClassManifest](
class UnionRDD[T: ClassManifest](
sc: SparkContext,
@transient var rdds: Seq[RDD[T]])
- extends RDD[T](sc, Nil) { // Nil, so the dependencies_ var does not refer to parent RDDs
+ extends RDD[T](sc, Nil) { // Nil, so the dependencies_ var does not refer to parent RDDs
@transient
var splits_ : Array[Split] = {
@@ -47,7 +47,8 @@ class UnionRDD[T: ClassManifest](
override def dependencies = deps_
- override def compute(s: Split): Iterator[T] = s.asInstanceOf[UnionSplit[T]].iterator()
+ override def compute(s: Split, context: TaskContext): Iterator[T] =
+ s.asInstanceOf[UnionSplit[T]].iterator(context)
override def preferredLocations(s: Split): Seq[String] = {
if (isCheckpointed) {
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
new file mode 100644
index 0000000000..33b64e2d24
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -0,0 +1,55 @@
+package spark.rdd
+
+import spark.{OneToOneDependency, RDD, SparkContext, Split, TaskContext}
+
+
+private[spark] class ZippedSplit[T: ClassManifest, U: ClassManifest](
+ idx: Int,
+ rdd1: RDD[T],
+ rdd2: RDD[U],
+ split1: Split,
+ split2: Split)
+ extends Split
+ with Serializable {
+
+ def iterator(context: TaskContext): Iterator[(T, U)] =
+ rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context))
+
+ def preferredLocations(): Seq[String] =
+ rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2))
+
+ override val index: Int = idx
+}
+
+class ZippedRDD[T: ClassManifest, U: ClassManifest](
+ sc: SparkContext,
+ @transient rdd1: RDD[T],
+ @transient rdd2: RDD[U])
+ extends RDD[(T, U)](sc, Nil)
+ with Serializable {
+
+ // TODO: FIX THIS.
+
+ @transient
+ val splits_ : Array[Split] = {
+ if (rdd1.splits.size != rdd2.splits.size) {
+ throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
+ }
+ val array = new Array[Split](rdd1.splits.size)
+ for (i <- 0 until rdd1.splits.size) {
+ array(i) = new ZippedSplit(i, rdd1, rdd2, rdd1.splits(i), rdd2.splits(i))
+ }
+ array
+ }
+
+ override def splits = splits_
+
+ @transient
+ override val dependencies = List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))
+
+ override def compute(s: Split, context: TaskContext): Iterator[(T, U)] =
+ s.asInstanceOf[ZippedSplit[T, U]].iterator(context)
+
+ override def preferredLocations(s: Split): Seq[String] =
+ s.asInstanceOf[ZippedSplit[T, U]].preferredLocations()
+}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 4b2570fa2b..9387ba19a3 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -17,8 +17,8 @@ import spark.storage.BlockManagerId
import util.{MetadataCleaner, TimeStampedHashMap}
/**
- * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
- * each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal
+ * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
+ * each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal
* schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
* and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
*/
@@ -74,7 +74,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val deadHosts = new HashSet[String] // TODO: The code currently assumes these can't come back;
// that's not going to be a realistic assumption in general
-
+
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
@@ -97,7 +97,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
cacheLocs(rdd.id)
}
-
+
def updateCacheLocs() {
cacheLocs = cacheTracker.getLocationsSnapshot()
}
@@ -329,7 +329,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val rdd = job.finalStage.rdd
val split = rdd.splits(job.partitions(0))
val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
- val result = job.func(taskContext, rdd.iterator(split))
+ val result = job.func(taskContext, rdd.iterator(split, taskContext))
+ taskContext.executeOnCompleteCallbacks()
job.listener.taskSucceeded(0, result)
} catch {
case e: Exception =>
@@ -356,7 +357,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
}
}
-
+
def submitMissingTasks(stage: Stage) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
@@ -398,7 +399,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val task = event.task
val stage = idToStage(task.stageId)
event.reason match {
- case Success =>
+ case Success =>
logInfo("Completed " + task)
if (event.accumUpdates != null) {
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
@@ -482,8 +483,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
") for resubmision due to a fetch failure")
// Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
- mapStage.removeOutputLoc(mapId, bmAddress)
- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+ if (mapId != -1) {
+ mapStage.removeOutputLoc(mapId, bmAddress)
+ mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+ }
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin +
"); marking it for resubmission")
failed += mapStage
@@ -520,7 +523,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
updateCacheLocs()
}
}
-
+
/**
* Aborts all jobs depending on a particular Stage. This is called in response to a task set
* being cancelled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 2ebd4075a2..e492279b4e 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -10,12 +10,14 @@ private[spark] class ResultTask[T, U](
@transient locs: Seq[String],
val outputId: Int)
extends Task[U](stageId) {
-
+
val split = rdd.splits(partition)
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId)
- func(context, rdd.iterator(split))
+ val result = func(context, rdd.iterator(split, context))
+ context.executeOnCompleteCallbacks()
+ result
}
override def preferredLocations: Seq[String] = locs
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 683f5ebec3..7fdc178d4b 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -73,19 +73,19 @@ private[spark] object ShuffleMapTask {
private[spark] class ShuffleMapTask(
stageId: Int,
- var rdd: RDD[_],
+ var rdd: RDD[_],
var dep: ShuffleDependency[_,_],
- var partition: Int,
+ var partition: Int,
@transient var locs: Seq[String])
extends Task[MapStatus](stageId)
with Externalizable
with Logging {
def this() = this(0, null, null, 0, null)
-
+
var split = if (rdd == null) {
- null
- } else {
+ null
+ } else {
rdd.splits(partition)
}
@@ -116,9 +116,11 @@ private[spark] class ShuffleMapTask(
val numOutputSplits = dep.partitioner.numPartitions
val partitioner = dep.partitioner
+ val taskContext = new TaskContext(stageId, partition, attemptId)
+
// Partition the map output.
val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
- for (elem <- rdd.iterator(split)) {
+ for (elem <- rdd.iterator(split, taskContext)) {
val pair = elem.asInstanceOf[(Any, Any)]
val bucketId = partitioner.getPartition(pair._1)
buckets(bucketId) += pair
@@ -136,6 +138,9 @@ private[spark] class ShuffleMapTask(
compressedSizes(i) = MapOutputTracker.compressSize(size)
}
+ // Execute the callbacks on task completion.
+ taskContext.executeOnCompleteCallbacks()
+
return new MapStatus(blockManager.blockManagerId, compressedSizes)
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index f5e852d203..20f6e65020 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -249,15 +249,22 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- def slaveLost(slaveId: String) {
+ def slaveLost(slaveId: String, reason: ExecutorLossReason) {
var failedHost: Option[String] = None
synchronized {
val host = slaveIdToHost(slaveId)
if (hostsAlive.contains(host)) {
+ logError("Lost an executor on " + host + ": " + reason)
slaveIdsWithExecutors -= slaveId
hostsAlive -= host
activeTaskSetsQueue.foreach(_.hostLost(host))
failedHost = Some(host)
+ } else {
+ // We may get multiple slaveLost() calls with different loss reasons. For example, one
+ // may be triggered by a dropped connection from the slave while another may be a report
+ // of executor termination from Mesos. We produce log messages for both so we eventually
+ // report the termination reason.
+ logError("Lost an executor on " + host + " (already removed): " + reason)
}
}
if (failedHost != None) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
new file mode 100644
index 0000000000..bba7de6a65
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
@@ -0,0 +1,21 @@
+package spark.scheduler.cluster
+
+import spark.executor.ExecutorExitCode
+
+/**
+ * Represents an explanation for a executor or whole slave failing or exiting.
+ */
+private[spark]
+class ExecutorLossReason(val message: String) {
+ override def toString: String = message
+}
+
+private[spark]
+case class ExecutorExited(val exitCode: Int)
+ extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+}
+
+private[spark]
+case class SlaveLost(_message: String = "Slave lost")
+ extends ExecutorLossReason(_message) {
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7aba7324ab..e2301347e5 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -19,6 +19,7 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+ val executorIdToSlaveId = new HashMap[String, String]
// Memory used by each executor (in megabytes)
val executorMemory = {
@@ -65,9 +66,23 @@ private[spark] class SparkDeploySchedulerBackend(
}
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
+ executorIdToSlaveId += id -> workerId
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
id, host, cores, Utils.memoryMegabytesToString(memory)))
}
- def executorRemoved(id: String, message: String) {}
+ def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {
+ val reason: ExecutorLossReason = exitStatus match {
+ case Some(code) => ExecutorExited(code)
+ case None => SlaveLost(message)
+ }
+ logInfo("Executor %s removed: %s".format(id, message))
+ executorIdToSlaveId.get(id) match {
+ case Some(slaveId) =>
+ executorIdToSlaveId.remove(id)
+ scheduler.slaveLost(slaveId, reason)
+ case None =>
+ logInfo("No slave ID known for executor %s".format(id))
+ }
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index d2cce0dc05..eeaae23dc8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -69,13 +69,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
context.stop(self)
case Terminated(actor) =>
- actorToSlaveId.get(actor).foreach(removeSlave)
+ actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated"))
case RemoteClientDisconnected(transport, address) =>
- addressToSlaveId.get(address).foreach(removeSlave)
+ addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected"))
case RemoteClientShutdown(transport, address) =>
- addressToSlaveId.get(address).foreach(removeSlave)
+ addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown"))
}
// Make fake resource offers on all slaves
@@ -99,7 +99,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
// Remove a disconnected slave from the cluster
- def removeSlave(slaveId: String) {
+ def removeSlave(slaveId: String, reason: String) {
logInfo("Slave " + slaveId + " disconnected, so removing it")
val numCores = freeCores(slaveId)
actorToSlaveId -= slaveActor(slaveId)
@@ -109,7 +109,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
freeCores -= slaveId
slaveHost -= slaveId
totalCoreCount.addAndGet(-numCores)
- scheduler.slaveLost(slaveId)
+ scheduler.slaveLost(slaveId, SlaveLost(reason))
}
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 814443fa52..8c7a1dfbc0 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -267,17 +267,23 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
- scheduler.slaveLost(slaveId.getValue)
+ scheduler.slaveLost(slaveId.getValue, reason)
+ }
+
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ recordSlaveLost(d, slaveId, SlaveLost())
}
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
- slaveLost(d, s)
+ override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
+ slaveId: SlaveID, status: Int) {
+ logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
+ slaveId.getValue))
+ recordSlaveLost(d, slaveId, ExecutorExited(status))
}
// TODO: query Mesos for number of cores
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 1e36578e1a..7a8ac10cdd 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,22 +1,26 @@
package spark.storage
+import java.io.{InputStream, OutputStream}
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.JavaConversions._
+
+import akka.actor.{ActorSystem, Cancellable, Props}
import akka.dispatch.{Await, Future}
import akka.util.Duration
+import akka.util.duration._
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput}
-import java.nio.{MappedByteBuffer, ByteBuffer}
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
+import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
-import spark.util.{MetadataCleaner, TimeStampedHashMap, ByteBufferInputStream}
+import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import sun.nio.ch.DirectBuffer
@@ -25,7 +29,11 @@ case class BlockException(blockId: String, message: String, ex: Exception = null
extends Exception(message)
private[spark]
-class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
+class BlockManager(
+ actorSystem: ActorSystem,
+ val master: BlockManagerMaster,
+ val serializer: Serializer,
+ maxMemory: Long)
extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -51,7 +59,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- private val blockInfo = new TimeStampedHashMap[String, BlockInfo]()
+ private val blockInfo = new TimeStampedHashMap[String, BlockInfo]
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -78,16 +86,31 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+ val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
+ name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
+
+ @volatile private var shuttingDown = false
+
+ private def heartBeat() {
+ if (!master.sendHeartBeat(blockManagerId)) {
+ reregister()
+ }
+ }
+
+ var heartBeatTask: Cancellable = null
+
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(master: BlockManagerMaster, serializer: Serializer) = {
- this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
+ this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
}
/**
@@ -95,47 +118,95 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* BlockManagerWorker actor.
*/
private def initialize() {
- master.registerBlockManager(blockManagerId, maxMemory)
+ master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
+ heartBeat()
+ }
+ }
}
/**
- * Get storage level of local block. If no info exists for the block, then returns null.
+ * Report all blocks to the BlockManager again. This may be necessary if we are dropped
+ * by the BlockManager and come back or if we become capable of recovering blocks on disk after
+ * an executor crash.
+ *
+ * This function deliberately fails silently if the master returns false (indicating that
+ * the slave needs to reregister). The error condition will be detected again by the next
+ * heart beat attempt or new block registration and another try to reregister all blocks
+ * will be made then.
*/
- def getLevel(blockId: String): StorageLevel = {
- blockInfo.get(blockId).map(_.level).orNull
+ private def reportAllBlocks() {
+ logInfo("Reporting " + blockInfo.size + " blocks to the master.")
+ for ((blockId, info) <- blockInfo) {
+ if (!tryToReportBlockStatus(blockId, info)) {
+ logError("Failed to report " + blockId + " to master; giving up.")
+ return
+ }
+ }
}
/**
- * Tell the master about the current storage status of a block. This will send a heartbeat
+ * Reregister with the master and report all blocks to it. This will be called by the heart beat
+ * thread if our heartbeat to the block amnager indicates that we were not registered.
+ */
+ def reregister() {
+ // TODO: We might need to rate limit reregistering.
+ logInfo("BlockManager reregistering with master")
+ master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
+ reportAllBlocks()
+ }
+
+ /**
+ * Get storage level of local block. If no info exists for the block, then returns null.
+ */
+ def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull
+
+ /**
+ * Tell the master about the current storage status of a block. This will send a block update
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
- def reportBlockStatus(blockId: String) {
- val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
- case None =>
- (StorageLevel.NONE, 0L, 0L)
- case Some(info) =>
- info.synchronized {
- info.level match {
- case null =>
- (StorageLevel.NONE, 0L, 0L)
- case level =>
- val inMem = level.useMemory && memoryStore.contains(blockId)
- val onDisk = level.useDisk && diskStore.contains(blockId)
- (
- new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
- if (inMem) memoryStore.getSize(blockId) else 0L,
- if (onDisk) diskStore.getSize(blockId) else 0L
- )
- }
- }
+ def reportBlockStatus(blockId: String, info: BlockInfo) {
+ val needReregister = !tryToReportBlockStatus(blockId, info)
+ if (needReregister) {
+ logInfo("Got told to reregister updating block " + blockId)
+ // Reregistering will report our new block for free.
+ reregister()
}
- master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
logDebug("Told master about block " + blockId)
}
/**
+ * Actually send a UpdateBlockInfo message. Returns the mater's response,
+ * which will be true if the block was successfully recorded and false if
+ * the slave needs to re-register.
+ */
+ private def tryToReportBlockStatus(blockId: String, info: BlockInfo): Boolean = {
+ val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
+ info.level match {
+ case null =>
+ (StorageLevel.NONE, 0L, 0L, false)
+ case level =>
+ val inMem = level.useMemory && memoryStore.contains(blockId)
+ val onDisk = level.useDisk && diskStore.contains(blockId)
+ val storageLevel = new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+ val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
+ val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
+ (storageLevel, memSize, diskSize, info.tellMaster)
+ }
+ }
+
+ if (tellMaster) {
+ master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
+ } else {
+ true
+ }
+ }
+
+
+ /**
* Get locations of the block.
*/
def getLocations(blockId: String): Seq[String] = {
@@ -302,7 +373,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} else {
logDebug("Block " + blockId + " not registered locally")
}
-
return None
}
@@ -572,7 +642,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// and tell the master about it.
myInfo.markReady(size)
if (tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, myInfo)
}
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -659,7 +729,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// and tell the master about it.
myInfo.markReady(bytes.limit)
if (tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, myInfo)
}
}
@@ -741,7 +811,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
val info = blockInfo.get(blockId).orNull
- if (info != null) {
+ if (info != null) {
info.synchronized {
val level = info.level
if (level.useDisk && !diskStore.contains(blockId)) {
@@ -753,9 +823,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
diskStore.putBytes(blockId, bytes, level)
}
}
- memoryStore.remove(blockId)
+ val blockWasRemoved = memoryStore.remove(blockId)
+ if (!blockWasRemoved) {
+ logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
+ }
if (info.tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, info)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
@@ -767,10 +840,34 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ /**
+ * Remove a block from both memory and disk.
+ */
+ def removeBlock(blockId: String) {
+ logInfo("Removing block " + blockId)
+ val info = blockInfo.get(blockId).orNull
+ if (info != null) info.synchronized {
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning("Block " + blockId + " could not be removed as it was not found in either " +
+ "the disk or memory store")
+ }
+ blockInfo.remove(blockId)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId, info)
+ }
+ } else {
+ // The block has already been removed; do nothing.
+ logWarning("Asked to remove block " + blockId + ", which does not exist")
+ }
+ }
+
def dropOldBlocks(cleanupTime: Long) {
logInfo("Dropping blocks older than " + cleanupTime)
val iterator = blockInfo.internalMap.entrySet().iterator()
- while(iterator.hasNext) {
+ while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
if (time < cleanupTime) {
@@ -785,7 +882,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
iterator.remove()
logInfo("Dropped block " + id)
}
- reportBlockStatus(id)
+ reportBlockStatus(id, info)
}
}
}
@@ -835,7 +932,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def stop() {
+ if (heartBeatTask != null) {
+ heartBeatTask.cancel()
+ }
connectionManager.stop()
+ master.actorSystem.stop(slaveActor)
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
@@ -845,11 +946,20 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
private[spark]
object BlockManager extends Logging {
+
+ val ID_GENERATOR = new IdGenerator
+
def getMaxMemoryFromSystemProperties: Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
+ def getHeartBeatFrequencyFromSystemProperties: Long =
+ System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
+
+ def getDisableHeartBeatsForTesting: Boolean =
+ System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
index 4933cc6606..488679f049 100644
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -1,8 +1,9 @@
package spark.storage
-import java.io.{IOException, ObjectOutput, ObjectInput, Externalizable}
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
+
private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
def this() = this(null, 0) // For deserialization only
@@ -19,10 +20,7 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
@throws(classOf[IOException])
- private def readResolve(): Object = {
- BlockManagerId.getCachedBlockManagerId(this)
- }
-
+ private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
override def toString = "BlockManagerId(" + ip + ", " + port + ")"
@@ -34,7 +32,9 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
}
-object BlockManagerId {
+
+private[spark] object BlockManagerId {
+
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
@@ -45,4 +45,4 @@ object BlockManagerId {
id
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index af15663621..a3d8671834 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,384 +1,140 @@
package spark.storage
-import java.io._
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.ArrayBuffer
import scala.util.Random
-import akka.actor._
-import akka.dispatch._
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.dispatch.Await
import akka.pattern.ask
-import akka.remote._
import akka.util.{Duration, Timeout}
import akka.util.duration._
import spark.{Logging, SparkException, Utils}
-private[spark]
-sealed trait ToBlockManagerMaster
-
-private[spark]
-case class RegisterBlockManager(
- blockManagerId: BlockManagerId,
- maxMemSize: Long)
- extends ToBlockManagerMaster
-
-private[spark]
-class UpdateBlockInfo(
- var blockManagerId: BlockManagerId,
- var blockId: String,
- var storageLevel: StorageLevel,
- var memSize: Long,
- var diskSize: Long)
- extends ToBlockManagerMaster
- with Externalizable {
-
- def this() = this(null, null, null, 0, 0) // For deserialization only
-
- override def writeExternal(out: ObjectOutput) {
- blockManagerId.writeExternal(out)
- out.writeUTF(blockId)
- storageLevel.writeExternal(out)
- out.writeInt(memSize.toInt)
- out.writeInt(diskSize.toInt)
- }
-
- override def readExternal(in: ObjectInput) {
- blockManagerId = new BlockManagerId()
- blockManagerId.readExternal(in)
- blockId = in.readUTF()
- storageLevel = new StorageLevel()
- storageLevel.readExternal(in)
- memSize = in.readInt()
- diskSize = in.readInt()
- }
-}
-
-private[spark]
-object UpdateBlockInfo {
- def apply(blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long): UpdateBlockInfo = {
- new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
- }
-
- // For pattern-matching
- def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
- }
-}
-
-private[spark]
-case class GetLocations(blockId: String) extends ToBlockManagerMaster
-
-private[spark]
-case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-
-private[spark]
-case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-
-private[spark]
-case class RemoveHost(host: String) extends ToBlockManagerMaster
-
-private[spark]
-case object StopBlockManagerMaster extends ToBlockManagerMaster
-
-private[spark]
-case object GetMemoryStatus extends ToBlockManagerMaster
-
-
-private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
-
- class BlockManagerInfo(
- val blockManagerId: BlockManagerId,
- timeMs: Long,
- val maxMem: Long) {
- private var _lastSeenMs = timeMs
- private var _remainingMem = maxMem
- private val _blocks = new JHashMap[String, StorageLevel]
-
- logInfo("Registering block manager %s:%d with %s RAM".format(
- blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
-
- def updateLastSeenMs() {
- _lastSeenMs = System.currentTimeMillis() / 1000
- }
-
- def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
- : Unit = synchronized {
-
- updateLastSeenMs()
-
- if (_blocks.containsKey(blockId)) {
- // The block exists on the slave already.
- val originalLevel: StorageLevel = _blocks.get(blockId)
-
- if (originalLevel.useMemory) {
- _remainingMem += memSize
- }
- }
-
- if (storageLevel.isValid) {
- // isValid means it is either stored in-memory or on-disk.
- _blocks.put(blockId, storageLevel)
- if (storageLevel.useMemory) {
- _remainingMem -= memSize
- logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
- }
- if (storageLevel.useDisk) {
- logInfo("Added %s on disk on %s:%d (size: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
- }
- } else if (_blocks.containsKey(blockId)) {
- // If isValid is not true, drop the block.
- val originalLevel: StorageLevel = _blocks.get(blockId)
- _blocks.remove(blockId)
- if (originalLevel.useMemory) {
- _remainingMem += memSize
- logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
- }
- if (originalLevel.useDisk) {
- logInfo("Removed %s on %s:%d on disk (size: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
- }
- }
- }
-
- def remainingMem: Long = _remainingMem
+private[spark] class BlockManagerMaster(
+ val actorSystem: ActorSystem,
+ isMaster: Boolean,
+ isLocal: Boolean,
+ masterIp: String,
+ masterPort: Int)
+ extends Logging {
- def lastSeenMs: Long = _lastSeenMs
+ val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
- override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
+ val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
+ val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
+ val DEFAULT_MANAGER_IP: String = Utils.localHostName()
- def clear() {
- _blocks.clear()
+ val timeout = 10.seconds
+ var masterActor: ActorRef = {
+ if (isMaster) {
+ val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
+ name = MASTER_AKKA_ACTOR_NAME)
+ logInfo("Registered BlockManagerMaster Actor")
+ masterActor
+ } else {
+ val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
+ logInfo("Connecting to BlockManagerMaster: " + url)
+ actorSystem.actorFor(url)
}
}
- private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
- private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
-
- initLogging()
-
- def removeHost(host: String) {
- logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
- logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
- val ip = host.split(":")(0)
- val port = host.split(":")(1)
- blockManagerInfo.remove(new BlockManagerId(ip, port.toInt))
- logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
- sender ! true
- }
-
- def receive = {
- case RegisterBlockManager(blockManagerId, maxMemSize) =>
- register(blockManagerId, maxMemSize)
-
- case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
-
- case GetLocations(blockId) =>
- getLocations(blockId)
-
- case GetLocationsMultipleBlockIds(blockIds) =>
- getLocationsMultipleBlockIds(blockIds)
-
- case GetPeers(blockManagerId, size) =>
- getPeersDeterministic(blockManagerId, size)
- /*getPeers(blockManagerId, size)*/
-
- case GetMemoryStatus =>
- getMemoryStatus
-
- case RemoveHost(host) =>
- removeHost(host)
- sender ! true
-
- case StopBlockManagerMaster =>
- logInfo("Stopping BlockManagerMaster")
- sender ! true
- context.stop(self)
-
- case other =>
- logInfo("Got unknown message: " + other)
+ /** Remove a dead host from the master actor. This is only called on the master side. */
+ def notifyADeadHost(host: String) {
+ tell(RemoveHost(host))
+ logInfo("Removed " + host + " successfully in notifyADeadHost")
}
- // Return a map from the block manager id to max memory and remaining memory.
- private def getMemoryStatus() {
- val res = blockManagerInfo.map { case(blockManagerId, info) =>
- (blockManagerId, (info.maxMem, info.remainingMem))
- }.toMap
- sender ! res
+ /**
+ * Send the master actor a heart beat from the slave. Returns true if everything works out,
+ * false if the master does not know about the given block manager, which means the block
+ * manager should re-register.
+ */
+ def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
+ askMasterWithRetry[Boolean](HeartBeat(blockManagerId))
}
- private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " "
- logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- logInfo("Got Register Msg from master node, don't register it")
- } else {
- blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
- blockManagerId, System.currentTimeMillis() / 1000, maxMemSize))
- }
- logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
- sender ! true
+ /** Register the BlockManager's id with the master. */
+ def registerBlockManager(
+ blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ logInfo("Trying to register BlockManager")
+ tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
+ logInfo("Registered BlockManager")
}
- private def updateBlockInfo(
+ def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long) {
-
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " " + blockId + " "
-
- if (!blockManagerInfo.contains(blockManagerId)) {
- // Can happen if this is from a locally cached partition on the master
- sender ! true
- return
- }
-
- if (blockId == null) {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- logDebug("Got in updateBlockInfo 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
- sender ! true
- }
-
- blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
-
- var locations: HashSet[BlockManagerId] = null
- if (blockInfo.containsKey(blockId)) {
- locations = blockInfo.get(blockId)._2
- } else {
- locations = new HashSet[BlockManagerId]
- blockInfo.put(blockId, (storageLevel.replication, locations))
- }
-
- if (storageLevel.isValid) {
- locations += blockManagerId
- } else {
- locations.remove(blockManagerId)
- }
-
- if (locations.size == 0) {
- blockInfo.remove(blockId)
- }
- sender ! true
+ diskSize: Long): Boolean = {
+ val res = askMasterWithRetry[Boolean](
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
+ logInfo("Updated info of block " + blockId)
+ res
}
- private def getLocations(blockId: String) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockId + " "
- logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
- if (blockInfo.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
- + Utils.getUsedTimeMs(startTimeMs))
- sender ! res.toSeq
- } else {
- logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- sender ! res
- }
+ /** Get locations of the blockId from the master */
+ def getLocations(blockId: String): Seq[BlockManagerId] = {
+ askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}
- private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
- def getLocations(blockId: String): Seq[BlockManagerId] = {
- val tmp = blockId
- logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
- if (blockInfo.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
- return res.toSeq
- } else {
- logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- return res.toSeq
- }
- }
-
- logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
- var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
- for (blockId <- blockIds) {
- res.append(getLocations(blockId))
- }
- logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
- sender ! res.toSeq
+ /** Get locations of multiple blockIds from the master */
+ def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
+ askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
- private def getPeers(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(peers)
- res -= blockManagerId
- val rand = new Random(System.currentTimeMillis())
- while (res.length > size) {
- res.remove(rand.nextInt(res.length))
+ /** Get ids of other nodes in the cluster from the master */
+ def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
+ val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
+ if (result.length != numPeers) {
+ throw new SparkException(
+ "Error getting peers, only got " + result.size + " instead of " + numPeers)
}
- sender ! res.toSeq
+ result
}
- private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ /**
+ * Remove a block from the slaves that have it. This can only be used to remove
+ * blocks that the master knows about.
+ */
+ def removeBlock(blockId: String) {
+ askMasterWithRetry(RemoveBlock(blockId))
+ }
- val peersWithIndices = peers.zipWithIndex
- val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1)
- if (selfIndex == -1) {
- throw new Exception("Self index for " + blockManagerId + " not found")
- }
+ /**
+ * Return the memory status for each block manager, in the form of a map from
+ * the block manager's id to two long values. The first value is the maximum
+ * amount of memory allocated for the block manager, while the second is the
+ * amount of remaining memory.
+ */
+ def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
+ askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
+ }
- // Note that this logic will select the same node multiple times if there aren't enough peers
- var index = selfIndex
- while (res.size < size) {
- index += 1
- if (index == selfIndex) {
- throw new Exception("More peer expected than available")
- }
- res += peers(index % peers.size)
+ /** Stop the master actor, called only on the Spark master node */
+ def stop() {
+ if (masterActor != null) {
+ tell(StopBlockManagerMaster)
+ masterActor = null
+ logInfo("BlockManagerMaster stopped")
}
- sender ! res.toSeq
}
-}
-private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
- extends Logging {
-
- val actorName = "BlockMasterManager"
- val timeout = 10.seconds
- val maxAttempts = 5
-
- var masterActor = if (isMaster) {
- val actor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), name = actorName)
- logInfo("Registered BlockManagerMaster Actor")
- actor
- } else {
- val host = System.getProperty("spark.master.host", "localhost")
- val port = System.getProperty("spark.master.port", "7077").toInt
- val url = "akka://spark@%s:%s/user/%s".format(host, port, actorName)
- val actor = actorSystem.actorFor(url)
- logInfo("Connecting to BlockManagerMaster: " + url)
- actor
+ /** Send a one-way message to the master actor, to which we expect it to reply with true. */
+ private def tell(message: Any) {
+ if (!askMasterWithRetry[Boolean](message)) {
+ throw new SparkException("BlockManagerMasterActor returned false, expected true.")
+ }
}
/**
* Send a message to the master actor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
- private def ask[T](message: Any): T = {
+ private def askMasterWithRetry[T](message: Any): T = {
// TODO: Consider removing multiple attempts
if (masterActor == null) {
throw new SparkException("Error sending message to BlockManager as masterActor is null " +
@@ -386,9 +142,9 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
var attempts = 0
var lastException: Exception = null
- while (attempts < maxAttempts) {
+ while (attempts < AKKA_RETRY_ATTEMPS) {
attempts += 1
- try {
+ try {
val future = masterActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
@@ -396,85 +152,16 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
return result.asInstanceOf[T]
} catch {
- case ie: InterruptedException =>
- throw ie
+ case ie: InterruptedException => throw ie
case e: Exception =>
lastException = e
- logWarning(
- "Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
+ logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
}
- Thread.sleep(100)
+ Thread.sleep(AKKA_RETRY_INTERVAL_MS)
}
+
throw new SparkException(
"Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
}
- /**
- * Send a one-way message to the master actor, to which we expect it to reply with true
- */
- private def tell(message: Any) {
- if (!ask[Boolean](message)) {
- throw new SparkException("Telling master a message returned false")
- }
- }
-
- /**
- * Register the BlockManager's id with the master
- */
- def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long) {
- logInfo("Trying to register BlockManager")
- tell(RegisterBlockManager(blockManagerId, maxMemSize))
- logInfo("Registered BlockManager")
- }
-
- def updateBlockInfo(
- blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long
- ) {
- tell(UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
- logInfo("Updated info of block " + blockId)
- }
-
- /** Get locations of the blockId from the master */
- def getLocations(blockId: String): Seq[BlockManagerId] = {
- ask[Seq[BlockManagerId]](GetLocations(blockId))
- }
-
- /** Get locations of multiple blockIds from the master */
- def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
- ask[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
- }
-
- /** Get ids of other nodes in the cluster from the master */
- def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
- val result = ask[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
- if (result.length != numPeers) {
- throw new SparkException(
- "Error getting peers, only got " + result.size + " instead of " + numPeers)
- }
- result
- }
-
- /** Notify the master of a dead node */
- def notifyADeadHost(host: String) {
- tell(RemoveHost(host + ":10902"))
- logInfo("Told BlockManagerMaster to remove dead host " + host)
- }
-
- /** Get the memory status form the master */
- def getMemoryStatus(): Map[BlockManagerId, (Long, Long)] = {
- ask[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
- }
-
- /** Stop the master actor, called only on the Spark master node */
- def stop() {
- if (masterActor != null) {
- tell(StopBlockManagerMaster)
- masterActor = null
- logInfo("BlockManagerMaster stopped")
- }
- }
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
new file mode 100644
index 0000000000..f4d026da33
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -0,0 +1,401 @@
+package spark.storage
+
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+import akka.actor.{Actor, ActorRef, Cancellable}
+import akka.util.{Duration, Timeout}
+import akka.util.duration._
+
+import spark.{Logging, Utils}
+
+/**
+ * BlockManagerMasterActor is an actor on the master node to track statuses of
+ * all slaves' block managers.
+ */
+private[spark]
+class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+
+ // Mapping from block manager id to the block manager's information.
+ private val blockManagerInfo =
+ new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+
+ // Mapping from host name to block manager id. We allow multiple block managers
+ // on the same host name (ip).
+ private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]]
+
+ // Mapping from block id to the set of block managers that have the block.
+ private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
+
+ initLogging()
+
+ val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+ "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+
+ val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+ "5000").toLong
+
+ var timeoutCheckingTask: Cancellable = null
+
+ override def preStart() {
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ timeoutCheckingTask = context.system.scheduler.schedule(
+ 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+ }
+ super.preStart()
+ }
+
+ def receive = {
+ case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
+ register(blockManagerId, maxMemSize, slaveActor)
+
+ case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
+
+ case GetLocations(blockId) =>
+ getLocations(blockId)
+
+ case GetLocationsMultipleBlockIds(blockIds) =>
+ getLocationsMultipleBlockIds(blockIds)
+
+ case GetPeers(blockManagerId, size) =>
+ getPeersDeterministic(blockManagerId, size)
+ /*getPeers(blockManagerId, size)*/
+
+ case GetMemoryStatus =>
+ getMemoryStatus
+
+ case RemoveBlock(blockId) =>
+ removeBlock(blockId)
+
+ case RemoveHost(host) =>
+ removeHost(host)
+ sender ! true
+
+ case StopBlockManagerMaster =>
+ logInfo("Stopping BlockManagerMaster")
+ sender ! true
+ if (timeoutCheckingTask != null) {
+ timeoutCheckingTask.cancel
+ }
+ context.stop(self)
+
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+
+ case HeartBeat(blockManagerId) =>
+ heartBeat(blockManagerId)
+
+ case other =>
+ logInfo("Got unknown message: " + other)
+ }
+
+ def removeBlockManager(blockManagerId: BlockManagerId) {
+ val info = blockManagerInfo(blockManagerId)
+
+ // Remove the block manager from blockManagerIdByHost. If the list of block
+ // managers belonging to the IP is empty, remove the entry from the hash map.
+ blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] =>
+ managers -= blockManagerId
+ if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip)
+ }
+
+ // Remove it from blockManagerInfo and remove all the blocks.
+ blockManagerInfo.remove(blockManagerId)
+ var iterator = info.blocks.keySet.iterator
+ while (iterator.hasNext) {
+ val blockId = iterator.next
+ val locations = blockLocations.get(blockId)._2
+ locations -= blockManagerId
+ if (locations.size == 0) {
+ blockLocations.remove(locations)
+ }
+ }
+ }
+
+ def expireDeadHosts() {
+ logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+ val now = System.currentTimeMillis()
+ val minSeenTime = now - slaveTimeout
+ val toRemove = new HashSet[BlockManagerId]
+ for (info <- blockManagerInfo.values) {
+ if (info.lastSeenMs < minSeenTime) {
+ logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+ toRemove += info.blockManagerId
+ }
+ }
+ toRemove.foreach(removeBlockManager)
+ }
+
+ def removeHost(host: String) {
+ logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
+ logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
+ blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager))
+ logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
+ sender ! true
+ }
+
+ def heartBeat(blockManagerId: BlockManagerId) {
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ sender ! true
+ } else {
+ sender ! false
+ }
+ } else {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ sender ! true
+ }
+ }
+
+ // Remove a block from the slaves that have it. This can only be used to remove
+ // blocks that the master knows about.
+ private def removeBlock(blockId: String) {
+ val block = blockLocations.get(blockId)
+ if (block != null) {
+ block._2.foreach { blockManagerId: BlockManagerId =>
+ val blockManager = blockManagerInfo.get(blockManagerId)
+ if (blockManager.isDefined) {
+ // Remove the block from the slave's BlockManager.
+ // Doesn't actually wait for a confirmation and the message might get lost.
+ // If message loss becomes frequent, we should add retry logic here.
+ blockManager.get.slaveActor ! RemoveBlock(blockId)
+ }
+ }
+ }
+ sender ! true
+ }
+
+ // Return a map from the block manager id to max memory and remaining memory.
+ private def getMemoryStatus() {
+ val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ (blockManagerId, (info.maxMem, info.remainingMem))
+ }.toMap
+ sender ! res
+ }
+
+ private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockManagerId + " "
+
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ logInfo("Got Register Msg from master node, don't register it")
+ } else {
+ blockManagerIdByHost.get(blockManagerId.ip) match {
+ case Some(managers) =>
+ // A block manager of the same host name already exists.
+ logInfo("Got another registration for host " + blockManagerId)
+ managers += blockManagerId
+ case None =>
+ blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId))
+ }
+
+ blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
+ blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
+ }
+ sender ! true
+ }
+
+ private def updateBlockInfo(
+ blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long) {
+
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockManagerId + " " + blockId + " "
+
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ // We intentionally do not register the master (except in local mode),
+ // so we should not indicate failure.
+ sender ! true
+ } else {
+ sender ! false
+ }
+ return
+ }
+
+ if (blockId == null) {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ sender ! true
+ return
+ }
+
+ blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
+
+ var locations: HashSet[BlockManagerId] = null
+ if (blockLocations.containsKey(blockId)) {
+ locations = blockLocations.get(blockId)._2
+ } else {
+ locations = new HashSet[BlockManagerId]
+ blockLocations.put(blockId, (storageLevel.replication, locations))
+ }
+
+ if (storageLevel.isValid) {
+ locations.add(blockManagerId)
+ } else {
+ locations.remove(blockManagerId)
+ }
+
+ // Remove the block from master tracking if it has been removed on all slaves.
+ if (locations.size == 0) {
+ blockLocations.remove(blockId)
+ }
+ sender ! true
+ }
+
+ private def getLocations(blockId: String) {
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockId + " "
+ if (blockLocations.containsKey(blockId)) {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(blockLocations.get(blockId)._2)
+ sender ! res.toSeq
+ } else {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ sender ! res
+ }
+ }
+
+ private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
+ def getLocations(blockId: String): Seq[BlockManagerId] = {
+ val tmp = blockId
+ if (blockLocations.containsKey(blockId)) {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(blockLocations.get(blockId)._2)
+ return res.toSeq
+ } else {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ return res.toSeq
+ }
+ }
+
+ var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
+ for (blockId <- blockIds) {
+ res.append(getLocations(blockId))
+ }
+ sender ! res.toSeq
+ }
+
+ private def getPeers(blockManagerId: BlockManagerId, size: Int) {
+ var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(peers)
+ res -= blockManagerId
+ val rand = new Random(System.currentTimeMillis())
+ while (res.length > size) {
+ res.remove(rand.nextInt(res.length))
+ }
+ sender ! res.toSeq
+ }
+
+ private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
+ var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+
+ val selfIndex = peers.indexOf(blockManagerId)
+ if (selfIndex == -1) {
+ throw new Exception("Self index for " + blockManagerId + " not found")
+ }
+
+ // Note that this logic will select the same node multiple times if there aren't enough peers
+ var index = selfIndex
+ while (res.size < size) {
+ index += 1
+ if (index == selfIndex) {
+ throw new Exception("More peer expected than available")
+ }
+ res += peers(index % peers.size)
+ }
+ sender ! res.toSeq
+ }
+}
+
+
+private[spark]
+object BlockManagerMasterActor {
+
+ case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+
+ class BlockManagerInfo(
+ val blockManagerId: BlockManagerId,
+ timeMs: Long,
+ val maxMem: Long,
+ val slaveActor: ActorRef)
+ extends Logging {
+
+ private var _lastSeenMs: Long = timeMs
+ private var _remainingMem: Long = maxMem
+
+ // Mapping from block id to its status.
+ private val _blocks = new JHashMap[String, BlockStatus]
+
+ logInfo("Registering block manager %s:%d with %s RAM".format(
+ blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
+
+ def updateLastSeenMs() {
+ _lastSeenMs = System.currentTimeMillis()
+ }
+
+ def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+ : Unit = synchronized {
+
+ updateLastSeenMs()
+
+ if (_blocks.containsKey(blockId)) {
+ // The block exists on the slave already.
+ val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
+
+ if (originalLevel.useMemory) {
+ _remainingMem += memSize
+ }
+ }
+
+ if (storageLevel.isValid) {
+ // isValid means it is either stored in-memory or on-disk.
+ _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
+ if (storageLevel.useMemory) {
+ _remainingMem -= memSize
+ logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(_remainingMem)))
+ }
+ if (storageLevel.useDisk) {
+ logInfo("Added %s on disk on %s:%d (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+ }
+ } else if (_blocks.containsKey(blockId)) {
+ // If isValid is not true, drop the block.
+ val blockStatus: BlockStatus = _blocks.get(blockId)
+ _blocks.remove(blockId)
+ if (blockStatus.storageLevel.useMemory) {
+ _remainingMem += blockStatus.memSize
+ logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(_remainingMem)))
+ }
+ if (blockStatus.storageLevel.useDisk) {
+ logInfo("Removed %s on %s:%d on disk (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+ }
+ }
+ }
+
+ def remainingMem: Long = _remainingMem
+
+ def lastSeenMs: Long = _lastSeenMs
+
+ def blocks: JHashMap[String, BlockStatus] = _blocks
+
+ override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
+
+ def clear() {
+ _blocks.clear()
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
new file mode 100644
index 0000000000..d73a9b790f
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -0,0 +1,102 @@
+package spark.storage
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import akka.actor.ActorRef
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from the master to slaves.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerSlave
+
+// Remove a block from the slaves that have it. This can only be used to remove
+// blocks that the master knows about.
+private[spark]
+case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from slaves to the master.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerMaster
+
+private[spark]
+case class RegisterBlockManager(
+ blockManagerId: BlockManagerId,
+ maxMemSize: Long,
+ sender: ActorRef)
+ extends ToBlockManagerMaster
+
+private[spark]
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class UpdateBlockInfo(
+ var blockManagerId: BlockManagerId,
+ var blockId: String,
+ var storageLevel: StorageLevel,
+ var memSize: Long,
+ var diskSize: Long)
+ extends ToBlockManagerMaster
+ with Externalizable {
+
+ def this() = this(null, null, null, 0, 0) // For deserialization only
+
+ override def writeExternal(out: ObjectOutput) {
+ blockManagerId.writeExternal(out)
+ out.writeUTF(blockId)
+ storageLevel.writeExternal(out)
+ out.writeInt(memSize.toInt)
+ out.writeInt(diskSize.toInt)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ blockManagerId = new BlockManagerId()
+ blockManagerId.readExternal(in)
+ blockId = in.readUTF()
+ storageLevel = new StorageLevel()
+ storageLevel.readExternal(in)
+ memSize = in.readInt()
+ diskSize = in.readInt()
+ }
+}
+
+private[spark]
+object UpdateBlockInfo {
+ def apply(blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long): UpdateBlockInfo = {
+ new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ }
+
+ // For pattern-matching
+ def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+ }
+}
+
+private[spark]
+case class GetLocations(blockId: String) extends ToBlockManagerMaster
+
+private[spark]
+case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+
+private[spark]
+case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+
+private[spark]
+case class RemoveHost(host: String) extends ToBlockManagerMaster
+
+private[spark]
+case object StopBlockManagerMaster extends ToBlockManagerMaster
+
+private[spark]
+case object GetMemoryStatus extends ToBlockManagerMaster
+
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
new file mode 100644
index 0000000000..f570cdc52d
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -0,0 +1,16 @@
+package spark.storage
+
+import akka.actor.Actor
+
+import spark.{Logging, SparkException, Utils}
+
+
+/**
+ * An actor to take commands from the master to execute options. For example,
+ * this is used to remove blocks from the slave's BlockManager.
+ */
+class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
+ override def receive = {
+ case RemoveBlock(blockId) => blockManager.removeBlock(blockId)
+ }
+}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 096bf8bdd9..8188d3595e 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -31,7 +31,12 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def getValues(blockId: String): Option[Iterator[Any]]
- def remove(blockId: String)
+ /**
+ * Remove a block, if it exists.
+ * @param blockId the block to remove.
+ * @return True if the block was found and removed, False otherwise.
+ */
+ def remove(blockId: String): Boolean
def contains(blockId: String): Boolean
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 8ba64e4b76..7e5b820cbb 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -10,6 +10,8 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import scala.collection.mutable.ArrayBuffer
+import spark.executor.ExecutorExitCode
+
import spark.Utils
/**
@@ -90,10 +92,13 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
}
- override def remove(blockId: String) {
+ override def remove(blockId: String): Boolean = {
val file = getFile(blockId)
if (file.exists()) {
file.delete()
+ true
+ } else {
+ false
}
}
@@ -162,7 +167,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
if (!foundLocalDir) {
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
" attempts to create local dir in " + rootDir)
- System.exit(1)
+ System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created local directory at " + localDir)
localDir
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index a222b2f3df..ae88ff0bb1 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -89,7 +89,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- override def remove(blockId: String) {
+ override def remove(blockId: String): Boolean = {
entries.synchronized {
val entry = entries.get(blockId)
if (entry != null) {
@@ -97,8 +97,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
+ true
} else {
- logWarning("Block " + blockId + " could not be removed as it does not exist")
+ false
}
}
}
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index eb88eb2759..e3544e5aae 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -1,9 +1,6 @@
package spark.storage
-import java.io.{IOException, Externalizable, ObjectInput, ObjectOutput}
-import collection.mutable
-import util.Random
-import collection.mutable.ArrayBuffer
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
/**
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
@@ -13,13 +10,14 @@ import collection.mutable.ArrayBuffer
* commonly useful storage levels.
*/
class StorageLevel(
- var useDisk: Boolean,
+ var useDisk: Boolean,
var useMemory: Boolean,
var deserialized: Boolean,
var replication: Int = 1)
extends Externalizable {
// TODO: Also add fields for caching priority, dataset ID, and flushing.
+
assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
def this(flags: Int, replication: Int) {
@@ -31,20 +29,16 @@ class StorageLevel(
override def clone(): StorageLevel = new StorageLevel(
this.useDisk, this.useMemory, this.deserialized, this.replication)
- override def hashCode(): Int = {
- toInt * 41 + replication
- }
-
override def equals(other: Any): Boolean = other match {
case s: StorageLevel =>
- s.useDisk == useDisk &&
+ s.useDisk == useDisk &&
s.useMemory == useMemory &&
s.deserialized == deserialized &&
- s.replication == replication
+ s.replication == replication
case _ =>
false
}
-
+
def isValid = ((useMemory || useDisk) && (replication > 0))
def toInt: Int = {
@@ -75,14 +69,15 @@ class StorageLevel(
}
@throws(classOf[IOException])
- private def readResolve(): Object = {
- StorageLevel.getCachedStorageLevel(this)
- }
+ private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
override def toString: String =
"StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
+
+ override def hashCode(): Int = toInt * 41 + replication
}
+
object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
@@ -96,9 +91,10 @@ object StorageLevel {
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
+ private[spark]
val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
- def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
+ private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
if (storageLevelCache.containsKey(level)) {
storageLevelCache.get(level)
} else {
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index e4a5b8ffdf..689f07b969 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -58,8 +58,10 @@ private[spark] object ThreadingTest {
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
- println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+ "Block " + blockId + " did not match")
+ println("Got block " + blockId + " in " +
+ (System.currentTimeMillis - startTime) + " ms")
case None =>
assert(false, "Block " + blockId + " could not be retrieved")
}
@@ -73,8 +75,10 @@ private[spark] object ThreadingTest {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
- val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
- val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
+ val masterIp: String = System.getProperty("spark.master.host", "localhost")
+ val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort)
+ val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
@@ -86,6 +90,7 @@ private[spark] object ThreadingTest {
actorSystem.shutdown()
actorSystem.awaitTermination()
println("Everything stopped.")
- println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
+ println(
+ "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
}
}
diff --git a/core/src/main/scala/spark/util/IdGenerator.scala b/core/src/main/scala/spark/util/IdGenerator.scala
new file mode 100644
index 0000000000..b6e309fe1a
--- /dev/null
+++ b/core/src/main/scala/spark/util/IdGenerator.scala
@@ -0,0 +1,14 @@
+package spark.util
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * A util used to get a unique generation ID. This is a wrapper around Java's
+ * AtomicInteger. An example usage is in BlockManager, where each BlockManager
+ * instance would start an Akka actor and we use this utility to assign the Akka
+ * actors unique names.
+ */
+private[spark] class IdGenerator {
+ private var id = new AtomicInteger
+ def next: Int = id.incrementAndGet
+}
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 2541b26255..139e21d09e 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -4,24 +4,29 @@ import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
import java.util.{TimerTask, Timer}
import spark.Logging
+
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
+
val delaySeconds = MetadataCleaner.getDelaySeconds
val periodSeconds = math.max(10, delaySeconds / 10)
val timer = new Timer(name + " cleanup timer", true)
+
val task = new TimerTask {
def run() {
try {
if (delaySeconds > 0) {
cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
logInfo("Ran metadata cleaner for " + name)
- }
+ }
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
}
}
}
+
if (periodSeconds > 0) {
- logInfo("Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ logInfo(
+ "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ "period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
}
@@ -31,7 +36,9 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
}
}
+
object MetadataCleaner {
def getDelaySeconds = (System.getProperty("spark.cleaner.delay", "-100").toDouble * 60).toInt
def setDelaySeconds(delay: Long) { System.setProperty("spark.cleaner.delay", delay.toString) }
}
+
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index 52f03784db..7e785182ea 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -1,8 +1,8 @@
package spark.util
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions
import scala.collection.mutable.Map
-import java.util.concurrent.ConcurrentHashMap
/**
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
index 3dcba3a545..be69e9bf02 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
@@ -4,9 +4,10 @@
<tr>
<td>
- <a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
+ <a href="@worker.webUiAddress">@worker.id</href>
</td>
<td>@{worker.host}:@{worker.port}</td>
+ <td>@worker.state</td>
<td>@worker.cores (@worker.coresUsed Used)</td>
<td>@{Utils.memoryMegabytesToString(worker.memory)}
(@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
index fad1af41dc..b249411a62 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
@@ -5,6 +5,7 @@
<tr>
<th>ID</th>
<th>Address</th>
+ <th>State</th>
<th>Cores</th>
<th>Memory</th>
</tr>
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 41d84cb01c..302a95db66 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -67,7 +67,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
testCheckpointing(_.glom())
testCheckpointing(_.mapPartitions(_.map(_.toString)))
testCheckpointing(r => new MapPartitionsWithSplitRDD(r,
- (i: Int, iter: Iterator[Int]) => iter.map(_.toString) ))
+ (i: Int, iter: Iterator[Int]) => iter.map(_.toString), false))
testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString), 1000)
testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x), 1000)
testCheckpointing(_.pipe(Seq("cat")))
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 6bd9836a93..46a0b68f89 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -1,5 +1,12 @@
package spark;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
@@ -12,8 +19,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import scala.Tuple2;
-
import spark.api.java.JavaDoubleRDD;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
@@ -24,10 +29,6 @@ import spark.partial.PartialResult;
import spark.storage.StorageLevel;
import spark.util.StatCounter;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -383,7 +384,8 @@ public class JavaAPISuite implements Serializable {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0)).next().intValue());
+ TaskContext context = new TaskContext(0, 0, 0);
+ Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
}
@Test
@@ -555,4 +557,17 @@ public class JavaAPISuite implements Serializable {
}
}).collect().toString());
}
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ @Override
+ public Double call(Integer x) {
+ return 1.0 * x;
+ }
+ });
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 4e9717d871..5b4b198960 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -2,10 +2,14 @@ package spark
import org.scalatest.FunSuite
+import akka.actor._
+import spark.scheduler.MapStatus
+import spark.storage.BlockManagerId
+
class MapOutputTrackerSuite extends FunSuite {
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
- assert(MapOutputTracker.compressSize(1L) === 0)
+ assert(MapOutputTracker.compressSize(1L) === 1)
assert(MapOutputTracker.compressSize(2L) === 8)
assert(MapOutputTracker.compressSize(10L) === 25)
assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
@@ -15,11 +19,58 @@ class MapOutputTrackerSuite extends FunSuite {
}
test("decompressSize") {
- assert(MapOutputTracker.decompressSize(0) === 1)
+ assert(MapOutputTracker.decompressSize(0) === 0)
for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
"size " + size + " decompressed to " + size2 + ", which is out of range")
}
}
+
+ test("master start and stop") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.stop()
+ }
+
+ test("master register and fetch") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.registerShuffle(10, 2)
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val compressedSize10000 = MapOutputTracker.compressSize(10000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
+ tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
+ Array(compressedSize1000, compressedSize10000)))
+ tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
+ Array(compressedSize10000, compressedSize1000)))
+ val statuses = tracker.getServerStatuses(10, 0)
+ assert(statuses.toSeq === Seq((new BlockManagerId("hostA", 1000), size1000),
+ (new BlockManagerId("hostB", 1000), size10000)))
+ tracker.stop()
+ }
+
+ test("master register and unregister and fetch") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.registerShuffle(10, 2)
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val compressedSize10000 = MapOutputTracker.compressSize(10000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
+ tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
+ Array(compressedSize1000, compressedSize1000, compressedSize1000)))
+ tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
+ Array(compressedSize10000, compressedSize1000, compressedSize1000)))
+
+ // As if we had two simulatenous fetch failures
+ tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+
+ // The remaining reduce task might try to grab the output dispite the shuffle failure;
+ // this should cause it to fail, and the scheduler will ignore the failure due to the
+ // stage already being aborted.
+ intercept[Exception] { tracker.getServerStatuses(10, 1) }
+ }
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 8ac7c8451a..4614901d78 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -127,4 +127,16 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
}
+
+ test("zipped RDDs") {
+ sc = new SparkContext("local", "test")
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val zipped = nums.zip(nums.map(_ + 1.0))
+ assert(zipped.glom().map(_.toList).collect().toList ===
+ List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0))))
+
+ intercept[IllegalArgumentException] {
+ nums.zip(sc.parallelize(1 to 4, 1)).collect()
+ }
+ }
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index a2d5e39859..8f86e3170e 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -7,6 +7,10 @@ import akka.actor._
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers._
+import org.scalatest.time.SpanSugar._
import spark.KryoSerializer
import spark.SizeEstimator
@@ -14,23 +18,25 @@ import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var store: BlockManager = null
+ var store2: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = null
var oldOops: String = null
-
- // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ var oldHeartBeat: String = null
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
before {
-
actorSystem = ActorSystem("test")
- master = new BlockManagerMaster(actorSystem, true, true)
+ master = new BlockManagerMaster(actorSystem, true, true, "localhost", 7077)
- // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+ // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
+ oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
}
@@ -38,6 +44,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
after {
if (store != null) {
store.stop()
+ store = null
+ }
+ if (store2 != null) {
+ store2.stop()
+ store2 = null
}
actorSystem.shutdown()
actorSystem.awaitTermination()
@@ -84,7 +95,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 1 manager interaction") {
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -94,16 +105,16 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
- // Checking whether blocks are in memory
+ // Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
- assert(master.getLocations("a1").size === 1, "master was not told about a1")
- assert(master.getLocations("a2").size === 1, "master was not told about a2")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
assert(master.getLocations("a3").size === 0, "master was told about a3")
-
+
// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
@@ -114,23 +125,106 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 2 managers interaction") {
- store = new BlockManager(master, serializer, 2000)
- val otherStore = new BlockManager(master, new KryoSerializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
- assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager")
+ assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
- otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+ store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
}
+ test("removing block") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+
+ // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+ store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
+
+ // Checking whether blocks are in memory and memory size
+ val memStatus = master.getMemoryStatus.head._2
+ assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+ assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+ assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
+ assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
+ assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
+
+ // Checking whether master knows about the blocks or not
+ assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3")
+
+ // Remove a1 and a2 and a3. Should be no-op for a3.
+ master.removeBlock("a1-to-remove")
+ master.removeBlock("a2-to-remove")
+ master.removeBlock("a3-to-remove")
+
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a1-to-remove") should be (None)
+ master.getLocations("a1-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a2-to-remove") should be (None)
+ master.getLocations("a2-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a3-to-remove") should not be (None)
+ master.getLocations("a3-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ val memStatus = master.getMemoryStatus.head._2
+ memStatus._1 should equal (2000L)
+ memStatus._2 should equal (2000L)
+ }
+ }
+
+ test("reregistration on heart beat") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
+
+ store invokePrivate heartBeat()
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+ }
+
+ test("reregistration on block update") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
+
+ store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
+ }
+
test("in-memory LRU storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -147,9 +241,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
}
-
+
test("in-memory LRU storage with serialization") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -168,7 +262,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -187,7 +281,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -210,7 +304,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -223,7 +317,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -238,7 +332,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -253,7 +347,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -268,7 +362,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -283,7 +377,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -308,7 +402,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -332,7 +426,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -378,7 +472,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager(master, serializer, 500)
+ store = new BlockManager(actorSystem, master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -389,49 +483,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()