aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/HttpFileServer.scala8
-rw-r--r--core/src/main/scala/spark/Serializer.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala9
-rw-r--r--core/src/main/scala/spark/deploy/master/JobState.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala37
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala9
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala7
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala46
-rw-r--r--docs/bagel-programming-guide.md2
-rw-r--r--docs/index.md2
-rw-r--r--docs/java-programming-guide.md170
11 files changed, 244 insertions, 52 deletions
diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala
index e6ad4dd28e..05ca846c85 100644
--- a/core/src/main/scala/spark/HttpFileServer.scala
+++ b/core/src/main/scala/spark/HttpFileServer.scala
@@ -20,7 +20,7 @@ class HttpFileServer extends Logging {
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
- httpServer = new HttpServer(fileDir)
+ httpServer = new HttpServer(baseDir)
httpServer.start()
serverUri = httpServer.uri
}
@@ -30,11 +30,13 @@ class HttpFileServer extends Logging {
}
def addFile(file: File) : String = {
- return addFileToDir(file, fileDir)
+ addFileToDir(file, fileDir)
+ return serverUri + "/files/" + file.getName
}
def addJar(file: File) : String = {
- return addFileToDir(file, jarDir)
+ addFileToDir(file, jarDir)
+ return serverUri + "/jars/" + file.getName
}
def addFileToDir(file: File, dir: File) : String = {
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala
index 61a70beaf1..5f26bd2a7b 100644
--- a/core/src/main/scala/spark/Serializer.scala
+++ b/core/src/main/scala/spark/Serializer.scala
@@ -43,7 +43,7 @@ trait SerializerInstance {
def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
// Default implementation uses deserializeStream
buffer.rewind()
- deserializeStream(new ByteBufferInputStream(buffer)).toIterator
+ deserializeStream(new ByteBufferInputStream(buffer)).asIterator
}
}
@@ -74,7 +74,7 @@ trait DeserializationStream {
* Read the elements of this stream through an iterator. This can only be called once, as
* reading each element will consume data from the input source.
*/
- def toIterator: Iterator[Any] = new Iterator[Any] {
+ def asIterator: Iterator[Any] = new Iterator[Any] {
var gotNext = false
var finished = false
var nextValue: Any = null
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 31d48b82b9..4c81a1b447 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
}
def coresLeft: Int = desc.cores - coresGranted
+
+ private var _retryCount = 0
+
+ def retryCount = _retryCount
+
+ def incrementRetryCount = {
+ _retryCount += 1
+ _retryCount
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala
index 50b0c6f95b..8d458ac39c 100644
--- a/core/src/main/scala/spark/deploy/master/JobState.scala
+++ b/core/src/main/scala/spark/deploy/master/JobState.scala
@@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED")
type JobState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value
+
+ val MAX_NUM_RETRY = 10
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c98dddea7b..5cc73633ab 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -1,19 +1,18 @@
package spark.deploy.master
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
import akka.actor._
-import spark.{Logging, Utils}
-import spark.util.AkkaUtils
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+
import java.text.SimpleDateFormat
import java.util.Date
-import akka.remote.RemoteClientLifeCycleEvent
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
import spark.deploy._
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
-import spark.deploy.RegisterWorker
-import spark.deploy.RegisterWorkerFailed
-import akka.actor.Terminated
+import spark.{Logging, SparkException, Utils}
+import spark.util.AkkaUtils
+
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
@@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.state = state
exec.job.actor ! ExecutorUpdated(execId, state, message)
if (ExecutorState.isFinished(state)) {
+ val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
logInfo("Removing executor " + exec.fullId + " because it is " + state)
- idToJob(jobId).removeExecutor(exec)
+ jobInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)
- // TODO: the worker would probably want to restart the executor a few times
- schedule()
+
+ // Only retry certain number of times so we don't go into an infinite loop.
+ if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+ schedule()
+ } else {
+ val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+ jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
+ logError(e.getMessage, e)
+ throw e
+ //System.exit(1)
+ }
}
}
case None =>
@@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
-
+
case RequestMasterState => {
sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 8f975c52d4..9999b6ba80 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -38,6 +38,10 @@ class Executor extends Logging {
System.setProperty(key, value)
}
+ // Create our ClassLoader and set it on this thread
+ urlClassLoader = createClassLoader()
+ Thread.currentThread.setContextClassLoader(urlClassLoader)
+
// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
SparkEnv.set(env)
@@ -45,11 +49,6 @@ class Executor extends Logging {
// Start worker thread pool
threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
- // Create our ClassLoader and set it on this thread
- urlClassLoader = createClassLoader()
- Thread.currentThread.setContextClassLoader(urlClassLoader)
-
}
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3a51f6bd96..15748b70d5 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -614,10 +614,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
- /*serializer.newInstance().deserializeMany(bytes)*/
- val ser = serializer.newInstance()
bytes.rewind()
- return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
+ val ser = serializer.newInstance()
+ return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator
}
def stop() {
@@ -632,7 +631,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
object BlockManager {
def getNumParallelFetchesFromSystemProperties(): Int = {
- System.getProperty("spark.blockManager.parallelFetches", "8").toInt
+ System.getProperty("spark.blockManager.parallelFetches", "4").toInt
}
def getMaxMemoryFromSystemProperties(): Long = {
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 09287faba0..d505df66a7 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,6 @@
package spark.storage
-import java.io.{File, RandomAccessFile}
+import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
import java.util.{LinkedHashMap, UUID}
@@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import scala.collection.mutable.ArrayBuffer
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
import spark.{Utils, Logging, Serializer, SizeEstimator}
/**
* Abstract class to store blocks
*/
-abstract class BlockStore(blockManager: BlockManager) extends Logging {
+abstract class BlockStore(val blockManager: BlockManager) extends Logging {
initLogging()
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
@@ -131,7 +133,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return None
}
if (entry.deserialized) {
- return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator)
+ return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
} else {
return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate()))
}
@@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
- if (file != null) {
- val channel = new RandomAccessFile(file, "rw").getChannel()
- val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
- buffer.put(bytes)
- channel.close()
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
- blockId, bytes.limit, (finishTime - startTime)))
- } else {
- logError("File not created for block " + blockId)
- }
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
+ buffer.put(bytes)
+ channel.close()
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
+ blockId, bytes.limit, (finishTime - startTime)))
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
- : Either[Iterator[Any], ByteBuffer] = {
- val bytes = dataSerialize(values)
- logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes")
- putBytes(blockId, bytes, level)
- return Right(bytes)
+ : Either[Iterator[Any], ByteBuffer] = {
+
+ logDebug("Attempting to write values for block " + blockId)
+ val file = createFile(blockId)
+ val fileOut = new FastBufferedOutputStream(new FileOutputStream(file))
+ val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
+ objOut.writeAll(values)
+ objOut.close()
+
+ // Return a byte buffer for the contents of the file
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
}
def getBytes(blockId: String): Option[ByteBuffer] = {
@@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
newFile.getParentFile.mkdirs()
return newFile
} else {
- logError("File for block " + blockId + " already exists on disk, " + file)
- return null
+ throw new Exception("File for block " + blockId + " already exists on disk, " + file)
}
}
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index b133376a97..0c925c176c 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -19,7 +19,7 @@ To write a Bagel application, you will need to add Spark, its dependencies, and
## Programming Model
-Bagel operates on a graph represented as a [distributed dataset]({{HOME_PATH}}programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
+Bagel operates on a graph represented as a [distributed dataset]({{HOME_PATH}}scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.
diff --git a/docs/index.md b/docs/index.md
index 3df638f629..69d55e505e 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -54,7 +54,7 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
# Where to Go from Here
-* [Spark Programming Guide]({{HOME_PATH}}programming-guide.html): how to get started using Spark, and details on the API
+* [Spark Programming Guide]({{HOME_PATH}}scala-programming-guide.html): how to get started using Spark, and details on the API
* [Running Spark on Amazon EC2]({{HOME_PATH}}ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes
* [Running Spark on Mesos]({{HOME_PATH}}running-on-mesos.html): instructions on how to deploy to a private cluster
* [Running Spark on YARN]({{HOME_PATH}}running-on-yarn.html): instructions on how to run Spark on top of a YARN cluster
diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md
index e3f644d748..c63448a965 100644
--- a/docs/java-programming-guide.md
+++ b/docs/java-programming-guide.md
@@ -2,4 +2,172 @@
layout: global
title: Java Programming Guide
---
-TODO: Write Java programming guide!
+
+The Spark Java API
+([spark.api.java]({{HOME_PATH}}api/core/index.html#spark.api.java.package)) defines
+[`JavaSparkContext`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaSparkContext) and
+[`JavaRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaRDD) clases,
+which support
+the same methods as their Scala counterparts but take Java functions and return
+Java data and collection types.
+
+Because Java API is similar to the Scala API, this programming guide only
+covers Java-specific features;
+the [Scala Programming Guide]({{HOME_PATH}}scala-programming-guide.html)
+provides a more general introduction to Spark concepts and should be read
+first.
+
+
+# Key differences in the Java API
+There are a few key differences between the Java and Scala APIs:
+
+* Java does not support anonymous or first-class functions, so functions must
+ be implemented by extending the
+ [`spark.api.java.function.Function`]({{HOME_PATH}}api/core/index.html#spark.api.java.function.Function),
+ [`Function2`]({{HOME_PATH}}api/core/index.html#spark.api.java.function.Function2), etc.
+ classes.
+* To maintain type safety, the Java API defines specialized Function and RDD
+ classes for key-value pairs and doubles.
+* RDD methods like `collect` and `countByKey` return Java collections types,
+ such as `java.util.List` and `java.util.Map`.
+
+
+## RDD Classes
+Spark defines additional operations on RDDs of doubles and key-value pairs, such
+as `stdev` and `join`.
+
+In the Scala API, these methods are automatically added using Scala's
+[implicit conversions](http://www.scala-lang.org/node/130) mechanism.
+
+In the Java API, the extra methods are defined in
+[`JavaDoubleRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaDoubleRDD) and
+[`JavaPairRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaPairRDD)
+classes. RDD methods like `map` are overloaded by specialized `PairFunction`
+and `DoubleFunction` classes, allowing them to return RDDs of the appropriate
+types. Common methods like `filter` and `sample` are implemented by
+each specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`,
+etc (this acheives the "same-result-type" principle used by the [Scala collections
+framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html)).
+
+## Function Classes
+
+The following table lists the function classes used by the Java API. Each
+class has a single abstract method, `call()`, that must be implemented.
+
+<table class="table">
+<tr><th>Class</th><th>Function Type</th></tr>
+
+<tr><td>Function&lt;T, R&gt;</td><td>T -&gt; R </td></tr>
+<tr><td>DoubleFunction&lt;T&gt;</td><td>T -&gt; Double </td></tr>
+<tr><td>PairFunction&lt;T, K, V&gt;</td><td>T -&gt; Tuple2&lt;K, V&gt; </td></tr>
+
+<tr><td>FlatMapFunction&lt;T, R&gt;</td><td>T -&gt; Iterable&lt;R&gt; </td></tr>
+<tr><td>DoubleFlatMapFunction&lt;T&gt;</td><td>T -&gt; Iterable&lt;Double&gt; </td></tr>
+<tr><td>PairFlatMapFunction&lt;T, K, V&gt;</td><td>T -&gt; Iterable&lt;Tuple2&lt;K, V&gt;&gt; </td></tr>
+
+<tr><td>Function2&lt;T1, T2, R&gt;</td><td>T1, T2 -&gt; R (function of two arguments)</td></tr>
+</table>
+
+# Other Features
+The Java API supports other Spark features, including
+[accumulators]({{HOME_PATH}}scala-programming-guide.html#accumulators),
+[broadcast variables]({{HOME_PATH}}scala-programming-guide.html#broadcast_variables), and
+[caching]({{HOME_PATH}}scala-programming-guide.html#caching).
+
+# Example
+
+As an example, we will implement word count using the Java API.
+
+{% highlight java %}
+import spark.api.java.*;
+import spark.api.java.function.*;
+
+JavaSparkContext sc = new JavaSparkContext(...);
+JavaRDD<String> lines = ctx.textFile("hdfs://...");
+JavaRDD<String> words = lines.flatMap(
+ new FlatMapFunction<String, String>() {
+ public Iterable<String> call(String s) {
+ return Arrays.asList(s.split(" "));
+ }
+ }
+);
+{% endhighlight %}
+
+The word count program starts by creating a `JavaSparkContext`, which accepts
+the same parameters as its Scala counterpart. `JavaSparkContext` supports the
+same data loading methods as the regular `SparkContext`; here, `textFile`
+loads lines from text files stored in HDFS.
+
+To split the lines into words, we use `flatMap` to split each line on
+whitespace. `flatMap` is passed a `FlatMapFunction` that accepts a string and
+returns an `java.lang.Iterable` of strings.
+
+Here, the `FlatMapFunction` was created inline; another option is to subclass
+`FlatMapFunction` and pass an instance to `flatMap`:
+
+{% highlight java %}
+class Split extends FlatMapFunction<String, String> {
+ public Iterable<String> call(String s) {
+ return Arrays.asList(s.split(" "));
+ }
+);
+JavaRDD<String> words = lines.flatMap(new Split());
+{% endhighlight %}
+
+Continuing with the word count example, we map each word to a `(word, 1)` pair:
+
+{% highlight java %}
+import scala.Tuple2;
+JavaPairRDD<String, Integer> ones = words.map(
+ new PairFunction<String, String, Integer>() {
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2(s, 1);
+ }
+ }
+);
+{% endhighlight %}
+
+Note that `map` was passed a `PairFunction<String, String, Integer>` and
+returned a `JavaPairRDD<String, Integer>`.
+
+
+
+To finish the word count program, we will use `reduceByKey` to count the
+occurrences of each word:
+
+{% highlight java %}
+JavaPairRDD<String, Integer> counts = ones.reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ }
+);
+{% endhighlight %}
+
+Here, `reduceByKey` is passed a `Function2`, which implements a function with
+two arguments. The resulting `JavaPairRDD` contains `(word, count)` pairs.
+
+In this example, we explicitly showed each intermediate RDD. It is also
+possible to chain the RDD transformations, so the word count example could also
+be written as:
+
+{% highlight java %}
+JavaPairRDD<String, Integer> counts = lines.flatMap(
+ ...
+ ).map(
+ ...
+ ).reduceByKey(
+ ...
+ );
+{% endhighlight %}
+There is no performance difference between these approaches; the choice is
+a matter of style.
+
+
+# Where to go from here
+Spark includes several sample jobs using the Java API in
+`examples/src/main/java`. You can run them by passing the class name to the
+`run` script included in Spark -- for example, `./run
+spark.examples.JavaWordCount`. Each example program prints usage help when run
+without any arguments.