aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/start-master.sh11
-rwxr-xr-xbin/start-slave.sh15
-rwxr-xr-xbin/start-slaves.sh18
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala35
-rw-r--r--core/src/main/scala/spark/RDD.scala75
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/Utils.scala33
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala10
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala2
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala4
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala16
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala7
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala6
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala54
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_row.scala.html2
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java15
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala55
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala12
-rw-r--r--docs/quick-start.md4
23 files changed, 309 insertions, 90 deletions
diff --git a/bin/start-master.sh b/bin/start-master.sh
index 6403c944a4..ad19d48331 100755
--- a/bin/start-master.sh
+++ b/bin/start-master.sh
@@ -7,4 +7,13 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
-"$bin"/spark-daemon.sh start spark.deploy.master.Master \ No newline at end of file
+# Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
+if [ "$SPARK_PUBLIC_DNS" = "" ]; then
+ # If we appear to be running on EC2, use the public address by default:
+ if [[ `hostname` == *ec2.internal ]]; then
+ echo "RUNNING ON EC2"
+ export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
+ fi
+fi
+
+"$bin"/spark-daemon.sh start spark.deploy.master.Master
diff --git a/bin/start-slave.sh b/bin/start-slave.sh
new file mode 100755
index 0000000000..10cce9c17b
--- /dev/null
+++ b/bin/start-slave.sh
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# Set SPARK_PUBLIC_DNS so slaves can be linked in master web UI
+if [ "$SPARK_PUBLIC_DNS" = "" ]; then
+ # If we appear to be running on EC2, use the public address by default:
+ if [[ `hostname` == *ec2.internal ]]; then
+ echo "RUNNING ON EC2"
+ export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
+ fi
+fi
+
+"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
diff --git a/bin/start-slaves.sh b/bin/start-slaves.sh
index 74b70a24be..390247ca4a 100755
--- a/bin/start-slaves.sh
+++ b/bin/start-slaves.sh
@@ -15,20 +15,10 @@ if [ "$SPARK_MASTER_PORT" = "" ]; then
fi
if [ "$SPARK_MASTER_IP" = "" ]; then
- hostname=`hostname`
- hostouput=`host "$hostname"`
-
- if [[ "$hostouput" == *"not found"* ]]; then
- echo $hostouput
- echo "Fail to identiy the IP for the master."
- echo "Set SPARK_MASTER_IP explicitly in configuration instead."
- exit 1
- fi
- ip=`host "$hostname" | cut -d " " -f 4`
-else
- ip=$SPARK_MASTER_IP
+ SPARK_MASTER_IP=`hostname`
fi
-echo "Master IP: $ip"
+echo "Master IP: $SPARK_MASTER_IP"
-"$bin"/spark-daemons.sh start spark.deploy.worker.Worker spark://$ip:$SPARK_MASTER_PORT \ No newline at end of file
+# Launch the slaves
+exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 45441aa5e5..50c4183c0e 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -2,6 +2,10 @@ package spark
import java.io._
import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import akka.actor._
import akka.dispatch._
@@ -11,16 +15,13 @@ import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
-import scheduler.MapStatus
+import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
- extends MapOutputTrackerMessage
+ extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
@@ -88,14 +89,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
mapStatuses.put(shuffleId, new Array[MapStatus](numMaps))
}
-
+
def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
var array = mapStatuses.get(shuffleId)
array.synchronized {
array(mapId) = status
}
}
-
+
def registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
@@ -110,7 +111,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var array = mapStatuses.get(shuffleId)
if (array != null) {
array.synchronized {
- if (array(mapId).address == bmAddress) {
+ if (array(mapId) != null && array(mapId).address == bmAddress) {
array(mapId) = null
}
}
@@ -119,10 +120,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
}
}
-
+
// Remembers which map output locations are currently being fetched on a worker
val fetching = new HashSet[Int]
-
+
// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = mapStatuses.get(shuffleId)
@@ -149,13 +150,17 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
val host = System.getProperty("spark.hostname", Utils.localHostName)
val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
val fetchedStatuses = deserializeStatuses(fetchedBytes)
-
+
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
+ if (fetchedStatuses.contains(null)) {
+ throw new FetchFailedException(null, shuffleId, -1, reduceId,
+ new Exception("Missing an output location for shuffle " + shuffleId))
+ }
return fetchedStatuses.map(s =>
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} else {
@@ -254,8 +259,10 @@ private[spark] object MapOutputTracker {
* sizes up to 35 GB with at most 10% error.
*/
def compressSize(size: Long): Byte = {
- if (size <= 1L) {
+ if (size == 0) {
0
+ } else if (size <= 1L) {
+ 1
} else {
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
}
@@ -266,7 +273,7 @@ private[spark] object MapOutputTracker {
*/
def decompressSize(compressedSize: Byte): Long = {
if (compressedSize == 0) {
- 1
+ 0
} else {
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 338dff4061..6270e018b3 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,17 +1,17 @@
package spark
import java.io.EOFException
-import java.net.URL
import java.io.ObjectInputStream
-import java.util.concurrent.atomic.AtomicLong
+import java.net.URL
import java.util.Random
import java.util.Date
import java.util.{HashMap => JHashMap}
+import java.util.concurrent.atomic.AtomicLong
-import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
-import scala.collection.mutable.HashMap
import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
@@ -42,12 +42,13 @@ import spark.rdd.MapPartitionsWithSplitRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
import spark.rdd.UnionRDD
+import spark.rdd.ZippedRDD
import spark.storage.StorageLevel
import SparkContext._
/**
- * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
+ * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such
@@ -86,28 +87,28 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
@transient val dependencies: List[Dependency[_]]
// Methods available on all RDDs:
-
+
/** Record user function generating this RDD. */
private[spark] val origin = Utils.getSparkCallSite
-
+
/** Optionally overridden by subclasses to specify how they are partitioned. */
val partitioner: Option[Partitioner] = None
/** Optionally overridden by subclasses to specify placement preferences. */
def preferredLocations(split: Split): Seq[String] = Nil
-
+
/** The [[spark.SparkContext]] that this RDD was created on. */
def context = sc
private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
-
+
/** A unique ID for this RDD (within its SparkContext). */
val id = sc.newRddId()
-
+
// Variables relating to persistence
private var storageLevel: StorageLevel = StorageLevel.NONE
-
- /**
+
+ /**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@@ -123,32 +124,32 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
-
+
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel
-
+
private[spark] def checkpoint(level: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): RDD[T] = {
if (!level.useDisk && level.replication < 2) {
throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")")
- }
-
+ }
+
// This is a hack. Ideally this should re-use the code used by the CacheTracker
// to generate the key.
def getSplitKey(split: Split) = "rdd_%d_%d".format(this.id, split.index)
-
+
persist(level)
sc.runJob(this, (iter: Iterator[T]) => {} )
-
+
val p = this.partitioner
-
+
new BlockRDD[T](sc, splits.map(getSplitKey).toArray) {
- override val partitioner = p
+ override val partitioner = p
}
}
-
+
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
@@ -161,9 +162,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
compute(split)
}
}
-
+
// Transformations (return a new RDD)
-
+
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
@@ -199,13 +200,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
var multiplier = 3.0
var initialCount = count()
var maxSelected = 0
-
+
if (initialCount > Integer.MAX_VALUE - 1) {
maxSelected = Integer.MAX_VALUE - 1
} else {
maxSelected = initialCount.toInt
}
-
+
if (num > initialCount) {
total = maxSelected
fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
@@ -215,14 +216,14 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
fraction = math.min(multiplier * (num + 1) / initialCount, 1.0)
total = num
}
-
+
val rand = new Random(seed)
var samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
-
+
while (samples.length < total) {
samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
}
-
+
Utils.randomizeInPlace(samples, rand).take(total)
}
@@ -290,8 +291,18 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] =
- new MapPartitionsWithSplitRDD(this, sc.clean(f))
+ def mapPartitionsWithSplit[U: ClassManifest](
+ f: (Int, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] =
+ new MapPartitionsWithSplitRDD(this, sc.clean(f), preservesPartitioning)
+
+ /**
+ * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
+ * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+ * partitions* and the *same number of elements in each partition* (e.g. one was made through
+ * a map on the other).
+ */
+ def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
// Actions (launch a job to return a value to the user program)
@@ -342,7 +353,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+ * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
*/
@@ -443,7 +454,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
val evaluator = new GroupedCountEvaluator[T](splits.size, confidence)
sc.runApproximateJob(this, countPartition, evaluator, timeout)
}
-
+
/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d26cccbfe1..0afab522af 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -86,7 +86,7 @@ class SparkContext(
// Set Spark master host and port system properties
if (System.getProperty("spark.master.host") == null) {
- System.setProperty("spark.master.host", Utils.localIpAddress())
+ System.setProperty("spark.master.host", Utils.localIpAddress)
}
if (System.getProperty("spark.master.port") == null) {
System.setProperty("spark.master.port", "0")
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index c8799e6de3..6d64b32174 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -1,12 +1,13 @@
package spark
import java.io._
-import java.net.{InetAddress, URL, URI}
+import java.net.{NetworkInterface, InetAddress, URL, URI}
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
import scala.io.Source
/**
@@ -199,12 +200,34 @@ private object Utils extends Logging {
/**
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
*/
- def localIpAddress(): String = {
+ lazy val localIpAddress: String = findLocalIpAddress()
+
+ private def findLocalIpAddress(): String = {
val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
- if (defaultIpOverride != null)
+ if (defaultIpOverride != null) {
defaultIpOverride
- else
- InetAddress.getLocalHost.getHostAddress
+ } else {
+ val address = InetAddress.getLocalHost
+ if (address.isLoopbackAddress) {
+ // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
+ // a better address using the local network interfaces
+ for (ni <- NetworkInterface.getNetworkInterfaces) {
+ for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress && !addr.isLoopbackAddress) {
+ // We've found an address that looks reasonable!
+ logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
+ " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
+ " instead (on interface " + ni.getName + ")")
+ logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
+ return addr.getHostAddress
+ }
+ }
+ logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
+ " a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
+ " external IP address!")
+ logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
+ }
+ address.getHostAddress
+ }
}
private var customHostname: Option[String] = None
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 13fcee1004..482eb9281a 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -172,6 +172,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
+ /**
+ * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
+ * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+ * partitions* and the *same number of elements in each partition* (e.g. one was made through
+ * a map on the other).
+ */
+ def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
+ JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest)
+ }
+
// Actions (launch a job to return a value to the user program)
/**
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index ef27bbb502..386f505f2a 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -48,7 +48,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
// Used only in Workers
@transient var ttGuide: TalkToGuide = null
- @transient var hostAddress = Utils.localIpAddress()
+ @transient var hostAddress = Utils.localIpAddress
@transient var listenPort = -1
@transient var guidePort = -1
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index fa676e9064..f573512835 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -36,7 +36,7 @@ extends Broadcast[T](id) with Logging with Serializable {
@transient var serveMR: ServeMultipleRequests = null
@transient var guideMR: GuideMultipleRequests = null
- @transient var hostAddress = Utils.localIpAddress()
+ @transient var hostAddress = Utils.localIpAddress
@transient var listenPort = -1
@transient var guidePort = -1
@@ -138,7 +138,7 @@ extends Broadcast[T](id) with Logging with Serializable {
serveMR = null
- hostAddress = Utils.localIpAddress()
+ hostAddress = Utils.localIpAddress
listenPort = -1
stopBroadcast = false
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 7a1089c816..f05413a53b 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -11,8 +11,15 @@ private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master
-private[spark]
-case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
+private[spark]
+case class RegisterWorker(
+ id: String,
+ host: String,
+ port: Int,
+ cores: Int,
+ memory: Int,
+ webUiPort: Int,
+ publicAddress: String)
extends DeployMessage
private[spark]
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
index bf0e7428ba..5b710f5520 100644
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/spark/deploy/client/TestClient.scala
@@ -23,7 +23,7 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress(), 0)
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
val desc = new JobDescription(
"TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()))
val listener = new TestListener
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 7e5cd6b171..31fb83f2e2 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
+ val masterPublicAddress = {
+ val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ if (envVar != null) envVar else ip
+ }
+
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each job
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
@@ -55,15 +60,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
override def receive = {
- case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort) => {
+ case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- addWorker(id, host, workerPort, cores, memory, worker_webUiPort)
+ addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + ip + ":" + webUiPort)
+ sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort)
schedule()
}
}
@@ -196,8 +201,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
- def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int): WorkerInfo = {
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort)
+ def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
+ publicAddress: String): WorkerInfo = {
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 706b1453aa..a0a698ef04 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -10,7 +10,8 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val actor: ActorRef,
- val webUiPort: Int) {
+ val webUiPort: Int,
+ val publicAddress: String) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
@@ -37,8 +38,8 @@ private[spark] class WorkerInfo(
def hasExecutor(job: JobInfo): Boolean = {
executors.values.exists(_.job == job)
}
-
+
def webUiAddress : String = {
- "http://" + this.host + ":" + this.webUiPort
+ "http://" + this.publicAddress + ":" + this.webUiPort
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 67d41dda29..31b8f0f955 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -36,6 +36,10 @@ private[spark] class Worker(
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
+ val publicAddress = {
+ val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ if (envVar != null) envVar else ip
+ }
var coresUsed = 0
var memoryUsed = 0
@@ -79,7 +83,7 @@ private[spark] class Worker(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
- master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort)
+ master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
index adc541694e..14e390c43b 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
@@ -12,9 +12,11 @@ import spark.Split
private[spark]
class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
- f: (Int, Iterator[T]) => Iterator[U])
+ f: (Int, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean)
extends RDD[U](prev.context) {
+ override val partitioner = if (preservesPartitioning) prev.partitioner else None
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(split.index, prev.iterator(split))
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
new file mode 100644
index 0000000000..80f0150c45
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -0,0 +1,54 @@
+package spark.rdd
+
+import spark.Dependency
+import spark.OneToOneDependency
+import spark.RDD
+import spark.SparkContext
+import spark.Split
+
+private[spark] class ZippedSplit[T: ClassManifest, U: ClassManifest](
+ idx: Int,
+ rdd1: RDD[T],
+ rdd2: RDD[U],
+ split1: Split,
+ split2: Split)
+ extends Split
+ with Serializable {
+
+ def iterator(): Iterator[(T, U)] = rdd1.iterator(split1).zip(rdd2.iterator(split2))
+
+ def preferredLocations(): Seq[String] =
+ rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2))
+
+ override val index: Int = idx
+}
+
+class ZippedRDD[T: ClassManifest, U: ClassManifest](
+ sc: SparkContext,
+ @transient rdd1: RDD[T],
+ @transient rdd2: RDD[U])
+ extends RDD[(T, U)](sc)
+ with Serializable {
+
+ @transient
+ val splits_ : Array[Split] = {
+ if (rdd1.splits.size != rdd2.splits.size) {
+ throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
+ }
+ val array = new Array[Split](rdd1.splits.size)
+ for (i <- 0 until rdd1.splits.size) {
+ array(i) = new ZippedSplit(i, rdd1, rdd2, rdd1.splits(i), rdd2.splits(i))
+ }
+ array
+ }
+
+ override def splits = splits_
+
+ @transient
+ override val dependencies = List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))
+
+ override def compute(s: Split): Iterator[(T, U)] = s.asInstanceOf[ZippedSplit[T, U]].iterator()
+
+ override def preferredLocations(s: Split): Seq[String] =
+ s.asInstanceOf[ZippedSplit[T, U]].preferredLocations()
+}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index aaaed59c4a..5c71207d43 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -479,8 +479,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
") for resubmision due to a fetch failure")
// Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
- mapStage.removeOutputLoc(mapId, bmAddress)
- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+ if (mapId != -1) {
+ mapStage.removeOutputLoc(mapId, bmAddress)
+ mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+ }
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin +
"); marking it for resubmission")
failed += mapStage
diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
index 3dcba3a545..c32ab30401 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
@@ -4,7 +4,7 @@
<tr>
<td>
- <a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
+ <a href="@worker.webUiAddress">@worker.id</href>
</td>
<td>@{worker.host}:@{worker.port}</td>
<td>@worker.cores (@worker.coresUsed Used)</td>
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 5875506179..007bb28692 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -44,6 +44,8 @@ public class JavaAPISuite implements Serializable {
public void tearDown() {
sc.stop();
sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port");
}
static class ReverseIntComparator implements Comparator<Integer>, Serializable {
@@ -553,4 +555,17 @@ public class JavaAPISuite implements Serializable {
}
}).collect().toString());
}
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ @Override
+ public Double call(Integer x) {
+ return 1.0 * x;
+ }
+ });
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 4e9717d871..5b4b198960 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -2,10 +2,14 @@ package spark
import org.scalatest.FunSuite
+import akka.actor._
+import spark.scheduler.MapStatus
+import spark.storage.BlockManagerId
+
class MapOutputTrackerSuite extends FunSuite {
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
- assert(MapOutputTracker.compressSize(1L) === 0)
+ assert(MapOutputTracker.compressSize(1L) === 1)
assert(MapOutputTracker.compressSize(2L) === 8)
assert(MapOutputTracker.compressSize(10L) === 25)
assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
@@ -15,11 +19,58 @@ class MapOutputTrackerSuite extends FunSuite {
}
test("decompressSize") {
- assert(MapOutputTracker.decompressSize(0) === 1)
+ assert(MapOutputTracker.decompressSize(0) === 0)
for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
"size " + size + " decompressed to " + size2 + ", which is out of range")
}
}
+
+ test("master start and stop") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.stop()
+ }
+
+ test("master register and fetch") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.registerShuffle(10, 2)
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val compressedSize10000 = MapOutputTracker.compressSize(10000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
+ tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
+ Array(compressedSize1000, compressedSize10000)))
+ tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
+ Array(compressedSize10000, compressedSize1000)))
+ val statuses = tracker.getServerStatuses(10, 0)
+ assert(statuses.toSeq === Seq((new BlockManagerId("hostA", 1000), size1000),
+ (new BlockManagerId("hostB", 1000), size10000)))
+ tracker.stop()
+ }
+
+ test("master register and unregister and fetch") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.registerShuffle(10, 2)
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val compressedSize10000 = MapOutputTracker.compressSize(10000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
+ tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
+ Array(compressedSize1000, compressedSize1000, compressedSize1000)))
+ tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
+ Array(compressedSize10000, compressedSize1000, compressedSize1000)))
+
+ // As if we had two simulatenous fetch failures
+ tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+
+ // The remaining reduce task might try to grab the output dispite the shuffle failure;
+ // this should cause it to fail, and the scheduler will ignore the failure due to the
+ // stage already being aborted.
+ intercept[Exception] { tracker.getServerStatuses(10, 1) }
+ }
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 37a0ff0947..b3c820ed94 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -114,4 +114,16 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
}
+
+ test("zipped RDDs") {
+ sc = new SparkContext("local", "test")
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val zipped = nums.zip(nums.map(_ + 1.0))
+ assert(zipped.glom().map(_.toList).collect().toList ===
+ List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0))))
+
+ intercept[IllegalArgumentException] {
+ nums.zip(sc.parallelize(1 to 4, 1)).collect()
+ }
+ }
}
diff --git a/docs/quick-start.md b/docs/quick-start.md
index dbc232b6e0..177cb14551 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -113,7 +113,7 @@ import SparkContext._
object SimpleJob extends Application {
val logFile = "/var/log/syslog" // Should be some file on your system
val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
- "target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")
+ List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar"))
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
@@ -172,7 +172,7 @@ public class SimpleJob {
public static void main(String[] args) {
String logFile = "/var/log/syslog"; // Should be some file on your system
JavaSparkContext sc = new JavaSparkContext("local", "Simple Job",
- "$YOUR_SPARK_HOME", "target/simple-project-1.0.jar");
+ "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"});
JavaRDD<String> logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function<String, Boolean>() {