aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Konwinski <andyk@berkeley.edu>2012-09-25 15:19:57 -0700
committerAndy Konwinski <andyk@berkeley.edu>2012-09-25 15:19:57 -0700
commit8d30fe616eacaa3263d46ca20153d1c4e518c16c (patch)
tree6369b667915663582d4b85c858e516d59a7b2b5b
parent8f7dfcf332b0e56bb0d7e0b97b60a6cb4a8735d1 (diff)
parentaa50d5b9a2b3c5b2adc958c96d88a623fa709e63 (diff)
downloadspark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.tar.gz
spark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.tar.bz2
spark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.zip
Merge remote-tracking branch 'public-spark/dev' into doc
-rw-r--r--core/src/main/scala/spark/Aggregator.scala4
-rw-r--r--core/src/main/scala/spark/HttpFileServer.scala8
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala87
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--core/src/main/scala/spark/Serializer.scala4
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala72
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/spark/Utils.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala9
-rw-r--r--core/src/main/scala/spark/deploy/master/JobState.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala37
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala3
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala9
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala7
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala46
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala24
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala18
-rw-r--r--docs/bagel-programming-guide.md2
-rw-r--r--docs/index.md2
-rw-r--r--docs/java-programming-guide.md170
-rw-r--r--project/SparkBuild.scala8
-rwxr-xr-xrun2
-rw-r--r--run.cmd2
-rw-r--r--run2.cmd68
-rw-r--r--sbt/sbt.cmd5
-rw-r--r--spark-shell.cmd4
28 files changed, 478 insertions, 132 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 6516bea157..b0daa70cfd 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -9,9 +9,9 @@ package spark
* known as map-side aggregations. When set to false,
* mergeCombiners function is not used.
*/
-class Aggregator[K, V, C] (
+case class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C,
val mapSideCombine: Boolean = true)
- extends Serializable
+
diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala
index e6ad4dd28e..05ca846c85 100644
--- a/core/src/main/scala/spark/HttpFileServer.scala
+++ b/core/src/main/scala/spark/HttpFileServer.scala
@@ -20,7 +20,7 @@ class HttpFileServer extends Logging {
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
- httpServer = new HttpServer(fileDir)
+ httpServer = new HttpServer(baseDir)
httpServer.start()
serverUri = httpServer.uri
}
@@ -30,11 +30,13 @@ class HttpFileServer extends Logging {
}
def addFile(file: File) : String = {
- return addFileToDir(file, fileDir)
+ addFileToDir(file, fileDir)
+ return serverUri + "/files/" + file.getName
}
def addJar(file: File) : String = {
- return addFileToDir(file, jarDir)
+ addFileToDir(file, jarDir)
+ return serverUri + "/jars/" + file.getName
}
def addFileToDir(file: File, dir: File) : String = {
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 64018f8c6b..aa1d00c63c 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -1,11 +1,10 @@
package spark
import java.io.EOFException
-import java.net.URL
import java.io.ObjectInputStream
+import java.net.URL
+import java.util.{Date, HashMap => JHashMap}
import java.util.concurrent.atomic.AtomicLong
-import java.util.{HashMap => JHashMap}
-import java.util.Date
import java.text.SimpleDateFormat
import scala.collection.Map
@@ -50,9 +49,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
- partitioner: Partitioner): RDD[(K, C)] = {
- val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
- new ShuffledRDD(self, aggregator, partitioner)
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true): RDD[(K, C)] = {
+ val aggregator =
+ if (mapSideCombine) {
+ new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ } else {
+ // Don't apply map-side combiner.
+ // A sanity check to make sure mergeCombiners is not defined.
+ assert(mergeCombiners == null)
+ new Aggregator[K, V, C](createCombiner, mergeValue, null, false)
+ }
+ new ShuffledAggregatedRDD(self, aggregator, partitioner)
}
def combineByKey[C](createCombiner: V => C,
@@ -65,7 +73,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
-
+
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
val map = new JHashMap[K, V]
@@ -116,13 +124,24 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
groupByKey(new HashPartitioner(numSplits))
}
- def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
- val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner)
- bufs.flatMapValues(buf => buf)
+ /**
+ * Repartition the RDD using the specified partitioner. If mapSideCombine is
+ * true, Spark will group values of the same key together on the map side
+ * before the repartitioning. If a large number of duplicated keys are
+ * expected, and the size of the keys are large, mapSideCombine should be set
+ * to true.
+ */
+ def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
+ if (mapSideCombine) {
+ def createCombiner(v: V) = ArrayBuffer(v)
+ def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+ def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
+ val bufs = combineByKey[ArrayBuffer[V]](
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner)
+ bufs.flatMapValues(buf => buf)
+ } else {
+ new RepartitionShuffledRDD(self, partitioner)
+ }
}
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
@@ -194,17 +213,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
-
+
def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF)
}
-
+
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
}
-
+
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
@@ -215,12 +234,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}
-
+
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]],
- other1.asInstanceOf[RDD[(_, _)]],
+ other1.asInstanceOf[RDD[(_, _)]],
other2.asInstanceOf[RDD[(_, _)]]),
partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
@@ -289,7 +308,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
-
+
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
@@ -363,7 +382,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
-
+
def saveAsHadoopDataset(conf: JobConf) {
val outputFormatClass = conf.getOutputFormat
val keyClass = conf.getOutputKeyClass
@@ -377,7 +396,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
-
+
logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
val writer = new HadoopWriter(conf)
@@ -390,14 +409,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
writer.setup(context.stageId, context.splitId, attemptNumber)
writer.open()
-
+
var count = 0
while(iter.hasNext) {
val record = iter.next
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
-
+
writer.close()
writer.commit()
}
@@ -413,28 +432,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
- extends Logging
+ extends Logging
with Serializable {
def sortByKey(ascending: Boolean = true): RDD[(K,V)] = {
- val rangePartitionedRDD = self.partitionBy(new RangePartitioner(self.splits.size, self, ascending))
- new SortedRDD(rangePartitionedRDD, ascending)
+ new ShuffledSortedRDD(self, ascending)
}
}
-class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean)
- extends RDD[(K, V)](prev.context) {
-
- override def splits = prev.splits
- override val partitioner = prev.partitioner
- override val dependencies = List(new OneToOneDependency(prev))
-
- override def compute(split: Split) = {
- prev.iterator(split).toArray
- .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
- }
-}
-
class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
@@ -444,7 +449,7 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)]
class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
extends RDD[(K, U)](prev.context) {
-
+
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override val partitioner = prev.partitioner
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index d28f3593fe..efe248896a 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -68,6 +68,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def preferredLocations(split: Split): Seq[String] = Nil
def context = sc
+
+ def elementClassManifest: ClassManifest[T] = classManifest[T]
// Get a unique ID for this RDD
val id = sc.newRddId()
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala
index 61a70beaf1..5f26bd2a7b 100644
--- a/core/src/main/scala/spark/Serializer.scala
+++ b/core/src/main/scala/spark/Serializer.scala
@@ -43,7 +43,7 @@ trait SerializerInstance {
def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
// Default implementation uses deserializeStream
buffer.rewind()
- deserializeStream(new ByteBufferInputStream(buffer)).toIterator
+ deserializeStream(new ByteBufferInputStream(buffer)).asIterator
}
}
@@ -74,7 +74,7 @@ trait DeserializationStream {
* Read the elements of this stream through an iterator. This can only be called once, as
* reading each element will consume data from the input source.
*/
- def toIterator: Iterator[Any] = new Iterator[Any] {
+ def asIterator: Iterator[Any] = new Iterator[Any] {
var gotNext = false
var finished = false
var nextValue: Any = null
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 3616d8e47e..a7346060b3 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -1,29 +1,89 @@
package spark
+import scala.collection.mutable.ArrayBuffer
import java.util.{HashMap => JHashMap}
+
class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
override def hashCode(): Int = idx
}
-class ShuffledRDD[K, V, C](
+
+/**
+ * The resulting RDD from a shuffle (e.g. repartitioning of data).
+ */
+abstract class ShuffledRDD[K, V, C](
@transient parent: RDD[(K, V)],
aggregator: Aggregator[K, V, C],
- part : Partitioner)
+ part : Partitioner)
extends RDD[(K, C)](parent.context) {
- //override val partitioner = Some(part)
+
override val partitioner = Some(part)
-
+
@transient
val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
override def splits = splits_
-
+
override def preferredLocations(split: Split) = Nil
-
+
val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
override val dependencies = List(dep)
+}
+
+
+/**
+ * Repartition a key-value pair RDD.
+ */
+class RepartitionShuffledRDD[K, V](
+ @transient parent: RDD[(K, V)],
+ part : Partitioner)
+ extends ShuffledRDD[K, V, V](
+ parent,
+ Aggregator[K, V, V](null, null, null, false),
+ part) {
+
+ override def compute(split: Split): Iterator[(K, V)] = {
+ val buf = new ArrayBuffer[(K, V)]
+ val fetcher = SparkEnv.get.shuffleFetcher
+ def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+ fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
+ buf.iterator
+ }
+}
+
+
+/**
+ * A sort-based shuffle (that doesn't apply aggregation). It does so by first
+ * repartitioning the RDD by range, and then sort within each range.
+ */
+class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
+ @transient parent: RDD[(K, V)],
+ ascending: Boolean)
+ extends RepartitionShuffledRDD[K, V](
+ parent,
+ new RangePartitioner(parent.splits.size, parent, ascending)) {
+
+ override def compute(split: Split): Iterator[(K, V)] = {
+ // By separating this from RepartitionShuffledRDD, we avoided a
+ // buf.iterator.toArray call, thus avoiding building up the buffer twice.
+ val buf = new ArrayBuffer[(K, V)]
+ def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+ SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
+ buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
+ }
+}
+
+
+/**
+ * The resulting RDD from shuffle and running (hash-based) aggregation.
+ */
+class ShuffledAggregatedRDD[K, V, C](
+ @transient parent: RDD[(K, V)],
+ aggregator: Aggregator[K, V, C],
+ part : Partitioner)
+ extends ShuffledRDD[K, V, C](parent, aggregator, part) {
override def compute(split: Split): Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 7473b40aa3..6ffae8e85f 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -40,6 +40,8 @@ class SparkEnv (
blockManager.stop()
blockManager.master.stop()
actorSystem.shutdown()
+ // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
+ Thread.sleep(100)
actorSystem.awaitTermination()
// Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
Thread.sleep(100)
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 07aa18e540..5a3f8bde43 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -163,6 +163,8 @@ object Utils extends Logging {
logInfo("Untarring " + filename)
Utils.execute(Seq("tar", "-xf", filename), targetDir)
}
+ // Make the file executable - That's necessary for scripts
+ FileUtil.chmod(filename, "a+x")
}
/**
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 31d48b82b9..4c81a1b447 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
}
def coresLeft: Int = desc.cores - coresGranted
+
+ private var _retryCount = 0
+
+ def retryCount = _retryCount
+
+ def incrementRetryCount = {
+ _retryCount += 1
+ _retryCount
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala
index 50b0c6f95b..8d458ac39c 100644
--- a/core/src/main/scala/spark/deploy/master/JobState.scala
+++ b/core/src/main/scala/spark/deploy/master/JobState.scala
@@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED")
type JobState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value
+
+ val MAX_NUM_RETRY = 10
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c98dddea7b..5cc73633ab 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -1,19 +1,18 @@
package spark.deploy.master
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
import akka.actor._
-import spark.{Logging, Utils}
-import spark.util.AkkaUtils
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+
import java.text.SimpleDateFormat
import java.util.Date
-import akka.remote.RemoteClientLifeCycleEvent
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
import spark.deploy._
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
-import spark.deploy.RegisterWorker
-import spark.deploy.RegisterWorkerFailed
-import akka.actor.Terminated
+import spark.{Logging, SparkException, Utils}
+import spark.util.AkkaUtils
+
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
@@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.state = state
exec.job.actor ! ExecutorUpdated(execId, state, message)
if (ExecutorState.isFinished(state)) {
+ val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
logInfo("Removing executor " + exec.fullId + " because it is " + state)
- idToJob(jobId).removeExecutor(exec)
+ jobInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)
- // TODO: the worker would probably want to restart the executor a few times
- schedule()
+
+ // Only retry certain number of times so we don't go into an infinite loop.
+ if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+ schedule()
+ } else {
+ val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+ jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
+ logError(e.getMessage, e)
+ throw e
+ //System.exit(1)
+ }
}
}
case None =>
@@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
-
+
case RequestMasterState => {
sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 7043361020..e2a9df275a 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -75,7 +75,8 @@ class ExecutorRunner(
def buildCommandSeq(): Seq[String] = {
val command = jobDesc.command
- val runScript = new File(sparkHome, "run").getCanonicalPath
+ val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run";
+ val runScript = new File(sparkHome, script).getCanonicalPath
Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables)
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 8f975c52d4..9999b6ba80 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -38,6 +38,10 @@ class Executor extends Logging {
System.setProperty(key, value)
}
+ // Create our ClassLoader and set it on this thread
+ urlClassLoader = createClassLoader()
+ Thread.currentThread.setContextClassLoader(urlClassLoader)
+
// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
SparkEnv.set(env)
@@ -45,11 +49,6 @@ class Executor extends Logging {
// Start worker thread pool
threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
- // Create our ClassLoader and set it on this thread
- urlClassLoader = createClassLoader()
- Thread.currentThread.setContextClassLoader(urlClassLoader)
-
}
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 3bcc588015..745aa0c939 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -44,7 +44,8 @@ object ShuffleMapTask {
}
// Since both the JarSet and FileSet have the same format this is used for both.
- def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
+ def serializeFileSet(
+ set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
val old = cache.get(stageId)
if (old != null) {
return old
@@ -59,7 +60,6 @@ object ShuffleMapTask {
}
}
-
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
synchronized {
val loader = Thread.currentThread.getContextClassLoader
@@ -113,7 +113,8 @@ class ShuffleMapTask(
out.writeInt(bytes.length)
out.write(bytes)
- val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache)
+ val fileSetBytes = ShuffleMapTask.serializeFileSet(
+ fileSet, stageId, ShuffleMapTask.fileSetCache)
out.writeInt(fileSetBytes.length)
out.write(fileSetBytes)
val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache)
@@ -172,7 +173,7 @@ class ShuffleMapTask(
buckets.map(_.iterator)
} else {
// No combiners (no map-side aggregation). Simply partition the map output.
- val buckets = Array.tabulate(numOutputSplits)(_ => new ArrayBuffer[(Any, Any)])
+ val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
for (elem <- rdd.iterator(split)) {
val pair = elem.asInstanceOf[(Any, Any)]
val bucketId = partitioner.getPartition(pair._1)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3a51f6bd96..15748b70d5 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -614,10 +614,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
- /*serializer.newInstance().deserializeMany(bytes)*/
- val ser = serializer.newInstance()
bytes.rewind()
- return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
+ val ser = serializer.newInstance()
+ return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator
}
def stop() {
@@ -632,7 +631,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
object BlockManager {
def getNumParallelFetchesFromSystemProperties(): Int = {
- System.getProperty("spark.blockManager.parallelFetches", "8").toInt
+ System.getProperty("spark.blockManager.parallelFetches", "4").toInt
}
def getMaxMemoryFromSystemProperties(): Long = {
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 09287faba0..d505df66a7 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,6 @@
package spark.storage
-import java.io.{File, RandomAccessFile}
+import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
import java.util.{LinkedHashMap, UUID}
@@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import scala.collection.mutable.ArrayBuffer
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
import spark.{Utils, Logging, Serializer, SizeEstimator}
/**
* Abstract class to store blocks
*/
-abstract class BlockStore(blockManager: BlockManager) extends Logging {
+abstract class BlockStore(val blockManager: BlockManager) extends Logging {
initLogging()
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
@@ -131,7 +133,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return None
}
if (entry.deserialized) {
- return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator)
+ return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
} else {
return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate()))
}
@@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
- if (file != null) {
- val channel = new RandomAccessFile(file, "rw").getChannel()
- val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
- buffer.put(bytes)
- channel.close()
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
- blockId, bytes.limit, (finishTime - startTime)))
- } else {
- logError("File not created for block " + blockId)
- }
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
+ buffer.put(bytes)
+ channel.close()
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
+ blockId, bytes.limit, (finishTime - startTime)))
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
- : Either[Iterator[Any], ByteBuffer] = {
- val bytes = dataSerialize(values)
- logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes")
- putBytes(blockId, bytes, level)
- return Right(bytes)
+ : Either[Iterator[Any], ByteBuffer] = {
+
+ logDebug("Attempting to write values for block " + blockId)
+ val file = createFile(blockId)
+ val fileOut = new FastBufferedOutputStream(new FileOutputStream(file))
+ val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
+ objOut.writeAll(values)
+ objOut.close()
+
+ // Return a byte buffer for the contents of the file
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
}
def getBytes(blockId: String): Option[ByteBuffer] = {
@@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
newFile.getParentFile.mkdirs()
return newFile
} else {
- logError("File for block " + blockId + " already exists on disk, " + file)
- return null
+ throw new Exception("File for block " + blockId + " already exists on disk, " + file)
}
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index b7b8a79327..93b876d205 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -18,7 +18,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
val clusterUrl = "local-cluster[2,1,512]"
- var sc: SparkContext = _
+ @transient var sc: SparkContext = _
after {
if (sc != null) {
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index 500af1eb90..fd7a7bd589 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -3,14 +3,14 @@ package spark
import com.google.common.io.Files
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
-import java.io.{File, PrintWriter}
+import java.io.{File, PrintWriter, FileReader, BufferedReader}
import SparkContext._
class FileServerSuite extends FunSuite with BeforeAndAfter {
- var sc: SparkContext = _
- var tmpFile : File = _
- var testJarFile : File = _
+ @transient var sc: SparkContext = _
+ @transient var tmpFile : File = _
+ @transient var testJarFile : File = _
before {
// Create a sample text file
@@ -38,7 +38,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
- val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
val fileVal = in.readLine().toInt
in.close()
_ * fileVal + _ * fileVal
@@ -53,7 +53,9 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
@@ -66,7 +68,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
- val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
val fileVal = in.readLine().toInt
in.close()
_ * fileVal + _ * fileVal
@@ -75,19 +77,19 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
-
test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
-
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index f622c413f7..9d7e2591f1 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -15,16 +15,16 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
-
+
var sc: SparkContext = _
-
+
after {
if (sc != null) {
sc.stop()
sc = null
}
}
-
+
test("groupByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -57,7 +57,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
-
+
test("groupByKey with many output partitions") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -187,7 +187,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
(4, (ArrayBuffer(), ArrayBuffer('w')))
))
}
-
+
test("zero-partition RDD") {
sc = new SparkContext("local", "test")
val emptyDir = Files.createTempDir()
@@ -195,7 +195,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
assert(file.splits.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
- assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
test("map-side combine") {
@@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
_+_,
_+_,
false)
- val shuffledRdd = new ShuffledRDD(
+ val shuffledRdd = new ShuffledAggregatedRDD(
pairs, aggregator, new HashPartitioner(2))
assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
@@ -220,7 +220,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// not see an exception because mergeCombine should not have been called.
val aggregatorWithException = new Aggregator[Int, Int, Int](
(v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
- val shuffledRdd1 = new ShuffledRDD(
+ val shuffledRdd1 = new ShuffledAggregatedRDD(
pairs, aggregatorWithException, new HashPartitioner(2))
assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
@@ -228,7 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// expect to see an exception thrown.
val aggregatorWithException1 = new Aggregator[Int, Int, Int](
(v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
- val shuffledRdd2 = new ShuffledRDD(
+ val shuffledRdd2 = new ShuffledAggregatedRDD(
pairs, aggregatorWithException1, new HashPartitioner(2))
evaluating { shuffledRdd2.collect() } should produce [SparkException]
}
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index b133376a97..0c925c176c 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -19,7 +19,7 @@ To write a Bagel application, you will need to add Spark, its dependencies, and
## Programming Model
-Bagel operates on a graph represented as a [distributed dataset]({{HOME_PATH}}programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
+Bagel operates on a graph represented as a [distributed dataset]({{HOME_PATH}}scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.
diff --git a/docs/index.md b/docs/index.md
index 3df638f629..69d55e505e 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -54,7 +54,7 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
# Where to Go from Here
-* [Spark Programming Guide]({{HOME_PATH}}programming-guide.html): how to get started using Spark, and details on the API
+* [Spark Programming Guide]({{HOME_PATH}}scala-programming-guide.html): how to get started using Spark, and details on the API
* [Running Spark on Amazon EC2]({{HOME_PATH}}ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes
* [Running Spark on Mesos]({{HOME_PATH}}running-on-mesos.html): instructions on how to deploy to a private cluster
* [Running Spark on YARN]({{HOME_PATH}}running-on-yarn.html): instructions on how to run Spark on top of a YARN cluster
diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md
index e3f644d748..c63448a965 100644
--- a/docs/java-programming-guide.md
+++ b/docs/java-programming-guide.md
@@ -2,4 +2,172 @@
layout: global
title: Java Programming Guide
---
-TODO: Write Java programming guide!
+
+The Spark Java API
+([spark.api.java]({{HOME_PATH}}api/core/index.html#spark.api.java.package)) defines
+[`JavaSparkContext`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaSparkContext) and
+[`JavaRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaRDD) clases,
+which support
+the same methods as their Scala counterparts but take Java functions and return
+Java data and collection types.
+
+Because Java API is similar to the Scala API, this programming guide only
+covers Java-specific features;
+the [Scala Programming Guide]({{HOME_PATH}}scala-programming-guide.html)
+provides a more general introduction to Spark concepts and should be read
+first.
+
+
+# Key differences in the Java API
+There are a few key differences between the Java and Scala APIs:
+
+* Java does not support anonymous or first-class functions, so functions must
+ be implemented by extending the
+ [`spark.api.java.function.Function`]({{HOME_PATH}}api/core/index.html#spark.api.java.function.Function),
+ [`Function2`]({{HOME_PATH}}api/core/index.html#spark.api.java.function.Function2), etc.
+ classes.
+* To maintain type safety, the Java API defines specialized Function and RDD
+ classes for key-value pairs and doubles.
+* RDD methods like `collect` and `countByKey` return Java collections types,
+ such as `java.util.List` and `java.util.Map`.
+
+
+## RDD Classes
+Spark defines additional operations on RDDs of doubles and key-value pairs, such
+as `stdev` and `join`.
+
+In the Scala API, these methods are automatically added using Scala's
+[implicit conversions](http://www.scala-lang.org/node/130) mechanism.
+
+In the Java API, the extra methods are defined in
+[`JavaDoubleRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaDoubleRDD) and
+[`JavaPairRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaPairRDD)
+classes. RDD methods like `map` are overloaded by specialized `PairFunction`
+and `DoubleFunction` classes, allowing them to return RDDs of the appropriate
+types. Common methods like `filter` and `sample` are implemented by
+each specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`,
+etc (this acheives the "same-result-type" principle used by the [Scala collections
+framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html)).
+
+## Function Classes
+
+The following table lists the function classes used by the Java API. Each
+class has a single abstract method, `call()`, that must be implemented.
+
+<table class="table">
+<tr><th>Class</th><th>Function Type</th></tr>
+
+<tr><td>Function&lt;T, R&gt;</td><td>T -&gt; R </td></tr>
+<tr><td>DoubleFunction&lt;T&gt;</td><td>T -&gt; Double </td></tr>
+<tr><td>PairFunction&lt;T, K, V&gt;</td><td>T -&gt; Tuple2&lt;K, V&gt; </td></tr>
+
+<tr><td>FlatMapFunction&lt;T, R&gt;</td><td>T -&gt; Iterable&lt;R&gt; </td></tr>
+<tr><td>DoubleFlatMapFunction&lt;T&gt;</td><td>T -&gt; Iterable&lt;Double&gt; </td></tr>
+<tr><td>PairFlatMapFunction&lt;T, K, V&gt;</td><td>T -&gt; Iterable&lt;Tuple2&lt;K, V&gt;&gt; </td></tr>
+
+<tr><td>Function2&lt;T1, T2, R&gt;</td><td>T1, T2 -&gt; R (function of two arguments)</td></tr>
+</table>
+
+# Other Features
+The Java API supports other Spark features, including
+[accumulators]({{HOME_PATH}}scala-programming-guide.html#accumulators),
+[broadcast variables]({{HOME_PATH}}scala-programming-guide.html#broadcast_variables), and
+[caching]({{HOME_PATH}}scala-programming-guide.html#caching).
+
+# Example
+
+As an example, we will implement word count using the Java API.
+
+{% highlight java %}
+import spark.api.java.*;
+import spark.api.java.function.*;
+
+JavaSparkContext sc = new JavaSparkContext(...);
+JavaRDD<String> lines = ctx.textFile("hdfs://...");
+JavaRDD<String> words = lines.flatMap(
+ new FlatMapFunction<String, String>() {
+ public Iterable<String> call(String s) {
+ return Arrays.asList(s.split(" "));
+ }
+ }
+);
+{% endhighlight %}
+
+The word count program starts by creating a `JavaSparkContext`, which accepts
+the same parameters as its Scala counterpart. `JavaSparkContext` supports the
+same data loading methods as the regular `SparkContext`; here, `textFile`
+loads lines from text files stored in HDFS.
+
+To split the lines into words, we use `flatMap` to split each line on
+whitespace. `flatMap` is passed a `FlatMapFunction` that accepts a string and
+returns an `java.lang.Iterable` of strings.
+
+Here, the `FlatMapFunction` was created inline; another option is to subclass
+`FlatMapFunction` and pass an instance to `flatMap`:
+
+{% highlight java %}
+class Split extends FlatMapFunction<String, String> {
+ public Iterable<String> call(String s) {
+ return Arrays.asList(s.split(" "));
+ }
+);
+JavaRDD<String> words = lines.flatMap(new Split());
+{% endhighlight %}
+
+Continuing with the word count example, we map each word to a `(word, 1)` pair:
+
+{% highlight java %}
+import scala.Tuple2;
+JavaPairRDD<String, Integer> ones = words.map(
+ new PairFunction<String, String, Integer>() {
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2(s, 1);
+ }
+ }
+);
+{% endhighlight %}
+
+Note that `map` was passed a `PairFunction<String, String, Integer>` and
+returned a `JavaPairRDD<String, Integer>`.
+
+
+
+To finish the word count program, we will use `reduceByKey` to count the
+occurrences of each word:
+
+{% highlight java %}
+JavaPairRDD<String, Integer> counts = ones.reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ }
+);
+{% endhighlight %}
+
+Here, `reduceByKey` is passed a `Function2`, which implements a function with
+two arguments. The resulting `JavaPairRDD` contains `(word, count)` pairs.
+
+In this example, we explicitly showed each intermediate RDD. It is also
+possible to chain the RDD transformations, so the word count example could also
+be written as:
+
+{% highlight java %}
+JavaPairRDD<String, Integer> counts = lines.flatMap(
+ ...
+ ).map(
+ ...
+ ).reduceByKey(
+ ...
+ );
+{% endhighlight %}
+There is no performance difference between these approaches; the choice is
+a matter of style.
+
+
+# Where to go from here
+Spark includes several sample jobs using the Java API in
+`examples/src/main/java`. You can run them by passing the class name to the
+`run` script included in Spark -- for example, `./run
+spark.examples.JavaWordCount`. Each example program prints usage help when run
+without any arguments.
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 203001954a..0247b46de4 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -22,7 +22,7 @@ object SparkBuild extends Build {
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
version := "0.6.0-SNAPSHOT",
- scalaVersion := "2.9.1",
+ scalaVersion := "2.9.2",
scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
@@ -61,9 +61,9 @@ object SparkBuild extends Build {
"asm" % "asm-all" % "3.3.1",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.9",
- "com.typesafe.akka" % "akka-actor" % "2.0.2",
- "com.typesafe.akka" % "akka-remote" % "2.0.2",
- "com.typesafe.akka" % "akka-slf4j" % "2.0.2",
+ "com.typesafe.akka" % "akka-actor" % "2.0.3",
+ "com.typesafe.akka" % "akka-remote" % "2.0.3",
+ "com.typesafe.akka" % "akka-slf4j" % "2.0.3",
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"cc.spray" % "spray-can" % "1.0-M2.1",
diff --git a/run b/run
index 2946a04d3f..5f640789ff 100755
--- a/run
+++ b/run
@@ -1,6 +1,6 @@
#!/bin/bash
-SCALA_VERSION=2.9.1
+SCALA_VERSION=2.9.2
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
diff --git a/run.cmd b/run.cmd
new file mode 100644
index 0000000000..cc5605f8a9
--- /dev/null
+++ b/run.cmd
@@ -0,0 +1,2 @@
+@echo off
+cmd /V /E /C %~dp0run2.cmd %*
diff --git a/run2.cmd b/run2.cmd
new file mode 100644
index 0000000000..9fc4d5054b
--- /dev/null
+++ b/run2.cmd
@@ -0,0 +1,68 @@
+@echo off
+
+set SCALA_VERSION=2.9.1
+
+rem Figure out where the Spark framework is installed
+set FWDIR=%~dp0
+
+rem Export this as SPARK_HOME
+set SPARK_HOME=%FWDIR%
+
+rem Load environment variables from conf\spark-env.cmd, if it exists
+if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+
+rem Check that SCALA_HOME has been specified
+if not "x%SCALA_HOME%"=="x" goto scala_exists
+ echo "SCALA_HOME is not set"
+ goto exit
+:scala_exists
+
+rem If the user specifies a Mesos JAR, put it before our included one on the classpath
+set MESOS_CLASSPATH=
+if not "x%MESOS_JAR%"=="x" set MESOS_CLASSPATH=%MESOS_JAR%
+
+rem Figure out how much memory to use per executor and set it as an environment
+rem variable so that our process sees it and can report it to Mesos
+if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m
+
+rem Set JAVA_OPTS to be able to load native libraries and to set heap size
+set JAVA_OPTS=%SPARK_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM%
+rem Load extra JAVA_OPTS from conf/java-opts, if it exists
+if exist "%FWDIR%conf\java-opts.cmd" call "%FWDIR%conf\java-opts.cmd"
+
+set CORE_DIR=%FWDIR%core
+set REPL_DIR=%FWDIR%repl
+set EXAMPLES_DIR=%FWDIR%examples
+set BAGEL_DIR=%FWDIR%bagel
+
+rem Build up classpath
+set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes
+set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources
+set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes
+for /R "%CORE_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+for /R "%FWDIR%\lib_managed\jars" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+for /R "%FWDIR%\lib_managed\bundles" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+for /R "%REPL_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
+
+rem Figure out whether to run our class with java or with the scala launcher.
+rem In most cases, we'd prefer to execute our process with java because scala
+rem creates a shell script as the parent of its Java process, which makes it
+rem hard to kill the child with stuff like Process.destroy(). However, for
+rem the Spark shell, the wrapper is necessary to properly reset the terminal
+rem when we exit, so we allow it to set a variable to launch with scala.
+if "%SPARK_LAUNCH_WITH_SCALA%" NEQ 1 goto java_runner
+ set RUNNER=%SCALA_HOME%\bin\scala
+ # Java options will be passed to scala as JAVA_OPTS
+ set EXTRA_ARGS=
+ goto run_spark
+:java_runner
+ set CLASSPATH=%CLASSPATH%;%SCALA_HOME%\lib\scala-library.jar;%SCALA_HOME%\lib\scala-compiler.jar;%SCALA_HOME%\lib\jline.jar
+ set RUNNER=java
+ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
+ rem The JVM doesn't read JAVA_OPTS by default so we need to pass it in
+ set EXTRA_ARGS=%JAVA_OPTS%
+:run_spark
+
+%RUNNER% -cp "%CLASSPATH%" %EXTRA_ARGS% %*
+:exit \ No newline at end of file
diff --git a/sbt/sbt.cmd b/sbt/sbt.cmd
new file mode 100644
index 0000000000..6b289ab447
--- /dev/null
+++ b/sbt/sbt.cmd
@@ -0,0 +1,5 @@
+@echo off
+set EXTRA_ARGS=
+if not "%MESOS_HOME%x"=="x" set EXTRA_ARGS=-Djava.library.path=%MESOS_HOME%\lib\java
+set SPARK_HOME=%~dp0..
+java -Xmx1200M -XX:MaxPermSize=200m %EXTRA_ARGS% -jar %SPARK_HOME%\sbt\sbt-launch-*.jar "%*"
diff --git a/spark-shell.cmd b/spark-shell.cmd
new file mode 100644
index 0000000000..34697d52d7
--- /dev/null
+++ b/spark-shell.cmd
@@ -0,0 +1,4 @@
+@echo off
+set FWDIR=%~dp0
+set SPARK_LAUNCH_WITH_SCALA=1
+cmd /V /E /C %FWDIR%run2.cmd spark.repl.Main %*