aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-20 10:11:49 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-20 10:11:49 -0800
commit73b3feebb9afa1bb3a45629a3bba77271f77489e (patch)
treefac8ee34f1640f6ee130b91b1bb705f2fb5e58e4
parent20adf27480bbbd6b9b1482d7567752aa92389efe (diff)
parentfd6e51deec83f01be3db41e84255329eedbe15da (diff)
downloadspark-73b3feebb9afa1bb3a45629a3bba77271f77489e.tar.gz
spark-73b3feebb9afa1bb3a45629a3bba77271f77489e.tar.bz2
spark-73b3feebb9afa1bb3a45629a3bba77271f77489e.zip
Merge pull request #388 from folone/master
Updated maven build configuration for Scala 2.10
-rw-r--r--core/pom.xml20
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala36
-rw-r--r--core/src/main/scala/spark/Utils.scala11
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala2
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala8
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala33
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala40
-rw-r--r--core/src/test/scala/spark/scheduler/TaskContextSuite.scala43
-rw-r--r--pom.xml57
-rwxr-xr-xpyspark7
-rwxr-xr-xrun7
-rw-r--r--run2.cmd2
13 files changed, 196 insertions, 73 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 862d3ec37a..6316b28a7b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -44,16 +44,16 @@
<artifactId>kryo-serializers</artifactId>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor</artifactId>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-actors</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-remote</artifactId>
+ <artifactId>akka-remote_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j</artifactId>
+ <artifactId>akka-slf4j_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
@@ -64,15 +64,19 @@
<artifactId>colt</artifactId>
</dependency>
<dependency>
- <groupId>cc.spray</groupId>
+ <groupId>io.spray</groupId>
<artifactId>spray-can</artifactId>
</dependency>
<dependency>
- <groupId>cc.spray</groupId>
- <artifactId>spray-server</artifactId>
+ <groupId>io.spray</groupId>
+ <artifactId>spray-routing</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.spray</groupId>
+ <artifactId>spray-io</artifactId>
</dependency>
<dependency>
- <groupId>cc.spray</groupId>
+ <groupId>io.spray</groupId>
<artifactId>spray-json_${scala.version}</artifactId>
</dependency>
<dependency>
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 08d2956782..b2c80d8eff 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -139,8 +139,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
case e: InterruptedException =>
}
}
- return mapStatuses.get(shuffleId).map(status =>
- (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId))))
+ return MapOutputTracker.convertMapStatuses(shuffleId, reduceId,
+ mapStatuses.get(shuffleId))
} else {
fetching += shuffleId
}
@@ -156,21 +156,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
- if (fetchedStatuses.contains(null)) {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing an output location for shuffle " + shuffleId))
- }
} finally {
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
}
- return fetchedStatuses.map(s =>
- (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
+ return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
} else {
- return statuses.map(s =>
- (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
+ return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
}
@@ -258,6 +252,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
private[spark] object MapOutputTracker {
private val LOG_BASE = 1.1
+ // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
+ // any of the statuses is null (indicating a missing location due to a failed mapper),
+ // throw a FetchFailedException.
+ def convertMapStatuses(
+ shuffleId: Int,
+ reduceId: Int,
+ statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
+ if (statuses == null) {
+ throw new FetchFailedException(null, shuffleId, -1, reduceId,
+ new Exception("Missing all output locations for shuffle " + shuffleId))
+ }
+ statuses.map {
+ status =>
+ if (status == null) {
+ throw new FetchFailedException(null, shuffleId, -1, reduceId,
+ new Exception("Missing an output location for shuffle " + shuffleId))
+ } else {
+ (status.address, decompressSize(status.compressedSizes(reduceId)))
+ }
+ }
+ }
+
/**
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
* We do this by encoding the log base 1.1 of the size as an integer, which can support
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 0e7007459d..aeed5d2f32 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -134,7 +134,7 @@ private object Utils extends Logging {
*/
def fetchFile(url: String, targetDir: File) {
val filename = url.split("/").last
- val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))
+ val tempDir = getLocalDir
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
@@ -205,6 +205,15 @@ private object Utils extends Logging {
}
/**
+ * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
+ * return a single directory, even though the spark.local.dir property might be a list of
+ * multiple paths.
+ */
+ def getLocalDir: String = {
+ System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
+ }
+
+ /**
* Shuffle the elements of a collection into a random order, returning the
* result in a new collection. Unlike scala.util.Random.shuffle, this method
* uses a local random number generator, avoiding inter-thread contention.
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 7eb4ddb74f..856a4683a9 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -89,7 +89,7 @@ private object HttpBroadcast extends Logging {
}
private def createServer() {
- broadcastDir = Utils.createTempDir()
+ broadcastDir = Utils.createTempDir(Utils.getLocalDir)
server = new HttpServer(broadcastDir)
server.start()
serverUri = server.uri
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 915f71ba9f..a29bf974d2 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -24,9 +24,6 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend
with Logging {
- val threadPool = new ThreadPoolExecutor(
- 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
var master: ActorRef = null
override def preStart() {
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index e492279b4e..2aad7956b4 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -15,9 +15,11 @@ private[spark] class ResultTask[T, U](
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId)
- val result = func(context, rdd.iterator(split, context))
- context.executeOnCompleteCallbacks()
- result
+ try {
+ func(context, rdd.iterator(split, context))
+ } finally {
+ context.executeOnCompleteCallbacks()
+ }
}
override def preferredLocations: Seq[String] = locs
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index d8be99dde7..9f5335978f 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -13,6 +13,20 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
var sc: SparkContext = null
+ implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
+ def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
+ t1 ++= t2
+ t1
+ }
+ def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
+ t1 += t2
+ t1
+ }
+ def zero(t: mutable.Set[A]) : mutable.Set[A] = {
+ new mutable.HashSet[A]()
+ }
+ }
+
after {
if (sc != null) {
sc.stop()
@@ -40,7 +54,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
test ("add value to collection accumulators") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -60,22 +73,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
}
- implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
- def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
- t1 ++= t2
- t1
- }
- def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
- t1 += t2
- t1
- }
- def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
- new mutable.HashSet[Any]()
- }
- }
-
test ("value not readable in tasks") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -123,7 +121,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
test ("localValue readable in tasks") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -135,7 +132,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
acc.value should be ( (0 to maxI).toSet)
sc.stop()
- sc = null
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 5b4b198960..d3dd3a8fa4 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -1,12 +1,18 @@
package spark
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import akka.actor._
import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
+import spark.util.AkkaUtils
-class MapOutputTrackerSuite extends FunSuite {
+class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
+ after {
+ System.clearProperty("spark.master.port")
+ }
+
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1)
@@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite {
// The remaining reduce task might try to grab the output dispite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
- intercept[Exception] { tracker.getServerStatuses(10, 1) }
+ intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
+ }
+
+ test("remote fetch") {
+ System.clearProperty("spark.master.host")
+ val (actorSystem, boundPort) =
+ AkkaUtils.createActorSystem("test", "localhost", 0)
+ System.setProperty("spark.master.port", boundPort.toString)
+ val masterTracker = new MapOutputTracker(actorSystem, true)
+ val slaveTracker = new MapOutputTracker(actorSystem, false)
+ masterTracker.registerShuffle(10, 1)
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ masterTracker.registerMapOutput(10, 0, new MapStatus(
+ new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+ Seq((new BlockManagerId("hostA", 1000), size1000)))
+
+ masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+ // failure should be cached
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
}
}
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
new file mode 100644
index 0000000000..f937877340
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
@@ -0,0 +1,43 @@
+package spark.scheduler
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import spark.TaskContext
+import spark.RDD
+import spark.SparkContext
+import spark.Split
+
+class TaskContextSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after {
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+
+ test("Calls executeOnCompleteCallbacks after failure") {
+ var completed = false
+ sc = new SparkContext("local", "test")
+ val rdd = new RDD[String](sc) {
+ override val splits = Array[Split](StubSplit(0))
+ override val dependencies = List()
+ override def compute(split: Split, context: TaskContext) = {
+ context.addOnCompleteCallback(() => completed = true)
+ sys.error("failed")
+ }
+ }
+ val func = (c: TaskContext, i: Iterator[String]) => i.next
+ val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
+ intercept[RuntimeException] {
+ task.run(0)
+ }
+ assert(completed === true)
+ }
+
+ case class StubSplit(val index: Int) extends Split
+} \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 751189a9d8..756fe8783b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,8 +41,8 @@
<module>core</module>
<module>bagel</module>
<module>examples</module>
- <module>repl</module>
- <module>repl-bin</module>
+ <!--<module>repl</module>
+ <module>repl-bin</module>-->
</modules>
<properties>
@@ -50,20 +50,20 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.5</java.version>
- <scala.version>2.9.2</scala.version>
+ <scala.version>2.10</scala.version>
<mesos.version>0.9.0-incubating</mesos.version>
- <akka.version>2.0.3</akka.version>
- <spray.version>1.0-M2.1</spray.version>
- <spray.json.version>1.1.1</spray.json.version>
+ <akka.version>2.1.0</akka.version>
+ <spray.version>1.1-M7</spray.version>
+ <spray.json.version>1.2.3</spray.json.version>
<slf4j.version>1.6.1</slf4j.version>
<cdh.version>4.1.2</cdh.version>
</properties>
<repositories>
<repository>
- <id>jboss-repo</id>
- <name>JBoss Repository</name>
- <url>http://repository.jboss.org/nexus/content/repositories/releases/</url>
+ <id>typesafe-repo</id>
+ <name>Typesafe Repository</name>
+ <url>http://repo.typesafe.com/typesafe/releases/</url>
<releases>
<enabled>true</enabled>
</releases>
@@ -72,9 +72,9 @@
</snapshots>
</repository>
<repository>
- <id>cloudera-repo</id>
- <name>Cloudera Repository</name>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ <id>jboss-repo</id>
+ <name>JBoss Repository</name>
+ <url>http://repository.jboss.org/nexus/content/repositories/releases/</url>
<releases>
<enabled>true</enabled>
</releases>
@@ -83,9 +83,9 @@
</snapshots>
</repository>
<repository>
- <id>typesafe-repo</id>
- <name>Typesafe Repository</name>
- <url>http://repo.typesafe.com/typesafe/releases/</url>
+ <id>cloudera-repo</id>
+ <name>Cloudera Repository</name>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
<releases>
<enabled>true</enabled>
</releases>
@@ -189,18 +189,18 @@
<version>0.20</version>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor</artifactId>
- <version>${akka.version}</version>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-actors</artifactId>
+ <version>2.10.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-remote</artifactId>
+ <artifactId>akka-remote_${scala.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j</artifactId>
+ <artifactId>akka-slf4j_${scala.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
@@ -214,17 +214,22 @@
<version>1.2.0</version>
</dependency>
<dependency>
- <groupId>cc.spray</groupId>
+ <groupId>io.spray</groupId>
<artifactId>spray-can</artifactId>
<version>${spray.version}</version>
</dependency>
<dependency>
- <groupId>cc.spray</groupId>
- <artifactId>spray-server</artifactId>
+ <groupId>io.spray</groupId>
+ <artifactId>spray-routing</artifactId>
+ <version>${spray.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.spray</groupId>
+ <artifactId>spray-io</artifactId>
<version>${spray.version}</version>
</dependency>
<dependency>
- <groupId>cc.spray</groupId>
+ <groupId>io.spray</groupId>
<artifactId>spray-json_${scala.version}</artifactId>
<version>${spray.json.version}</version>
</dependency>
@@ -258,13 +263,13 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
- <version>1.8</version>
+ <version>1.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.version}</artifactId>
- <version>1.9</version>
+ <version>1.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/pyspark b/pyspark
index 9e89d51ba2..ab7f4f50c0 100755
--- a/pyspark
+++ b/pyspark
@@ -6,6 +6,13 @@ FWDIR="$(cd `dirname $0`; pwd)"
# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"
+# Exit if the user hasn't compiled Spark
+if [ ! -e "$SPARK_HOME/repl/target" ]; then
+ echo "Failed to find Spark classes in $SPARK_HOME/repl/target" >&2
+ echo "You need to compile Spark before running this program" >&2
+ exit 1
+fi
+
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
diff --git a/run b/run
index ca23455386..eb93db66db 100755
--- a/run
+++ b/run
@@ -65,6 +65,13 @@ EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
PYSPARK_DIR="$FWDIR/python"
+# Exit if the user hasn't compiled Spark
+if [ ! -e "$REPL_DIR/target" ]; then
+ echo "Failed to find Spark classes in $REPL_DIR/target" >&2
+ echo "You need to compile Spark before running this program" >&2
+ exit 1
+fi
+
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH"
CLASSPATH+=":$FWDIR/conf"
diff --git a/run2.cmd b/run2.cmd
index 83464b1166..67f1e465e4 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -1,6 +1,6 @@
@echo off
-set SCALA_VERSION=2.9.1
+set SCALA_VERSION=2.9.2
rem Figure out where the Spark framework is installed
set FWDIR=%~dp0