aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndy Konwinski <andyk@berkeley.edu>2012-09-25 15:19:57 -0700
committerAndy Konwinski <andyk@berkeley.edu>2012-09-25 15:19:57 -0700
commit8d30fe616eacaa3263d46ca20153d1c4e518c16c (patch)
tree6369b667915663582d4b85c858e516d59a7b2b5b /core
parent8f7dfcf332b0e56bb0d7e0b97b60a6cb4a8735d1 (diff)
parentaa50d5b9a2b3c5b2adc958c96d88a623fa709e63 (diff)
downloadspark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.tar.gz
spark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.tar.bz2
spark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.zip
Merge remote-tracking branch 'public-spark/dev' into doc
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Aggregator.scala4
-rw-r--r--core/src/main/scala/spark/HttpFileServer.scala8
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala87
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--core/src/main/scala/spark/Serializer.scala4
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala72
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/spark/Utils.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala9
-rw-r--r--core/src/main/scala/spark/deploy/master/JobState.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala37
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala3
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala9
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala7
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala46
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala24
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala18
19 files changed, 223 insertions, 124 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 6516bea157..b0daa70cfd 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -9,9 +9,9 @@ package spark
* known as map-side aggregations. When set to false,
* mergeCombiners function is not used.
*/
-class Aggregator[K, V, C] (
+case class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C,
val mapSideCombine: Boolean = true)
- extends Serializable
+
diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala
index e6ad4dd28e..05ca846c85 100644
--- a/core/src/main/scala/spark/HttpFileServer.scala
+++ b/core/src/main/scala/spark/HttpFileServer.scala
@@ -20,7 +20,7 @@ class HttpFileServer extends Logging {
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
- httpServer = new HttpServer(fileDir)
+ httpServer = new HttpServer(baseDir)
httpServer.start()
serverUri = httpServer.uri
}
@@ -30,11 +30,13 @@ class HttpFileServer extends Logging {
}
def addFile(file: File) : String = {
- return addFileToDir(file, fileDir)
+ addFileToDir(file, fileDir)
+ return serverUri + "/files/" + file.getName
}
def addJar(file: File) : String = {
- return addFileToDir(file, jarDir)
+ addFileToDir(file, jarDir)
+ return serverUri + "/jars/" + file.getName
}
def addFileToDir(file: File, dir: File) : String = {
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 64018f8c6b..aa1d00c63c 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -1,11 +1,10 @@
package spark
import java.io.EOFException
-import java.net.URL
import java.io.ObjectInputStream
+import java.net.URL
+import java.util.{Date, HashMap => JHashMap}
import java.util.concurrent.atomic.AtomicLong
-import java.util.{HashMap => JHashMap}
-import java.util.Date
import java.text.SimpleDateFormat
import scala.collection.Map
@@ -50,9 +49,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
- partitioner: Partitioner): RDD[(K, C)] = {
- val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
- new ShuffledRDD(self, aggregator, partitioner)
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true): RDD[(K, C)] = {
+ val aggregator =
+ if (mapSideCombine) {
+ new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ } else {
+ // Don't apply map-side combiner.
+ // A sanity check to make sure mergeCombiners is not defined.
+ assert(mergeCombiners == null)
+ new Aggregator[K, V, C](createCombiner, mergeValue, null, false)
+ }
+ new ShuffledAggregatedRDD(self, aggregator, partitioner)
}
def combineByKey[C](createCombiner: V => C,
@@ -65,7 +73,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
-
+
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
val map = new JHashMap[K, V]
@@ -116,13 +124,24 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
groupByKey(new HashPartitioner(numSplits))
}
- def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
- val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner)
- bufs.flatMapValues(buf => buf)
+ /**
+ * Repartition the RDD using the specified partitioner. If mapSideCombine is
+ * true, Spark will group values of the same key together on the map side
+ * before the repartitioning. If a large number of duplicated keys are
+ * expected, and the size of the keys are large, mapSideCombine should be set
+ * to true.
+ */
+ def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
+ if (mapSideCombine) {
+ def createCombiner(v: V) = ArrayBuffer(v)
+ def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+ def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
+ val bufs = combineByKey[ArrayBuffer[V]](
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner)
+ bufs.flatMapValues(buf => buf)
+ } else {
+ new RepartitionShuffledRDD(self, partitioner)
+ }
}
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
@@ -194,17 +213,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
-
+
def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF)
}
-
+
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
}
-
+
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
@@ -215,12 +234,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}
-
+
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]],
- other1.asInstanceOf[RDD[(_, _)]],
+ other1.asInstanceOf[RDD[(_, _)]],
other2.asInstanceOf[RDD[(_, _)]]),
partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
@@ -289,7 +308,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
-
+
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
@@ -363,7 +382,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
-
+
def saveAsHadoopDataset(conf: JobConf) {
val outputFormatClass = conf.getOutputFormat
val keyClass = conf.getOutputKeyClass
@@ -377,7 +396,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
-
+
logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
val writer = new HadoopWriter(conf)
@@ -390,14 +409,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
writer.setup(context.stageId, context.splitId, attemptNumber)
writer.open()
-
+
var count = 0
while(iter.hasNext) {
val record = iter.next
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
-
+
writer.close()
writer.commit()
}
@@ -413,28 +432,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
- extends Logging
+ extends Logging
with Serializable {
def sortByKey(ascending: Boolean = true): RDD[(K,V)] = {
- val rangePartitionedRDD = self.partitionBy(new RangePartitioner(self.splits.size, self, ascending))
- new SortedRDD(rangePartitionedRDD, ascending)
+ new ShuffledSortedRDD(self, ascending)
}
}
-class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean)
- extends RDD[(K, V)](prev.context) {
-
- override def splits = prev.splits
- override val partitioner = prev.partitioner
- override val dependencies = List(new OneToOneDependency(prev))
-
- override def compute(split: Split) = {
- prev.iterator(split).toArray
- .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
- }
-}
-
class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
@@ -444,7 +449,7 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)]
class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
extends RDD[(K, U)](prev.context) {
-
+
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override val partitioner = prev.partitioner
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index d28f3593fe..efe248896a 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -68,6 +68,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def preferredLocations(split: Split): Seq[String] = Nil
def context = sc
+
+ def elementClassManifest: ClassManifest[T] = classManifest[T]
// Get a unique ID for this RDD
val id = sc.newRddId()
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala
index 61a70beaf1..5f26bd2a7b 100644
--- a/core/src/main/scala/spark/Serializer.scala
+++ b/core/src/main/scala/spark/Serializer.scala
@@ -43,7 +43,7 @@ trait SerializerInstance {
def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
// Default implementation uses deserializeStream
buffer.rewind()
- deserializeStream(new ByteBufferInputStream(buffer)).toIterator
+ deserializeStream(new ByteBufferInputStream(buffer)).asIterator
}
}
@@ -74,7 +74,7 @@ trait DeserializationStream {
* Read the elements of this stream through an iterator. This can only be called once, as
* reading each element will consume data from the input source.
*/
- def toIterator: Iterator[Any] = new Iterator[Any] {
+ def asIterator: Iterator[Any] = new Iterator[Any] {
var gotNext = false
var finished = false
var nextValue: Any = null
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 3616d8e47e..a7346060b3 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -1,29 +1,89 @@
package spark
+import scala.collection.mutable.ArrayBuffer
import java.util.{HashMap => JHashMap}
+
class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
override def hashCode(): Int = idx
}
-class ShuffledRDD[K, V, C](
+
+/**
+ * The resulting RDD from a shuffle (e.g. repartitioning of data).
+ */
+abstract class ShuffledRDD[K, V, C](
@transient parent: RDD[(K, V)],
aggregator: Aggregator[K, V, C],
- part : Partitioner)
+ part : Partitioner)
extends RDD[(K, C)](parent.context) {
- //override val partitioner = Some(part)
+
override val partitioner = Some(part)
-
+
@transient
val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
override def splits = splits_
-
+
override def preferredLocations(split: Split) = Nil
-
+
val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
override val dependencies = List(dep)
+}
+
+
+/**
+ * Repartition a key-value pair RDD.
+ */
+class RepartitionShuffledRDD[K, V](
+ @transient parent: RDD[(K, V)],
+ part : Partitioner)
+ extends ShuffledRDD[K, V, V](
+ parent,
+ Aggregator[K, V, V](null, null, null, false),
+ part) {
+
+ override def compute(split: Split): Iterator[(K, V)] = {
+ val buf = new ArrayBuffer[(K, V)]
+ val fetcher = SparkEnv.get.shuffleFetcher
+ def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+ fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
+ buf.iterator
+ }
+}
+
+
+/**
+ * A sort-based shuffle (that doesn't apply aggregation). It does so by first
+ * repartitioning the RDD by range, and then sort within each range.
+ */
+class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
+ @transient parent: RDD[(K, V)],
+ ascending: Boolean)
+ extends RepartitionShuffledRDD[K, V](
+ parent,
+ new RangePartitioner(parent.splits.size, parent, ascending)) {
+
+ override def compute(split: Split): Iterator[(K, V)] = {
+ // By separating this from RepartitionShuffledRDD, we avoided a
+ // buf.iterator.toArray call, thus avoiding building up the buffer twice.
+ val buf = new ArrayBuffer[(K, V)]
+ def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+ SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
+ buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
+ }
+}
+
+
+/**
+ * The resulting RDD from shuffle and running (hash-based) aggregation.
+ */
+class ShuffledAggregatedRDD[K, V, C](
+ @transient parent: RDD[(K, V)],
+ aggregator: Aggregator[K, V, C],
+ part : Partitioner)
+ extends ShuffledRDD[K, V, C](parent, aggregator, part) {
override def compute(split: Split): Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 7473b40aa3..6ffae8e85f 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -40,6 +40,8 @@ class SparkEnv (
blockManager.stop()
blockManager.master.stop()
actorSystem.shutdown()
+ // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
+ Thread.sleep(100)
actorSystem.awaitTermination()
// Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
Thread.sleep(100)
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 07aa18e540..5a3f8bde43 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -163,6 +163,8 @@ object Utils extends Logging {
logInfo("Untarring " + filename)
Utils.execute(Seq("tar", "-xf", filename), targetDir)
}
+ // Make the file executable - That's necessary for scripts
+ FileUtil.chmod(filename, "a+x")
}
/**
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 31d48b82b9..4c81a1b447 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
}
def coresLeft: Int = desc.cores - coresGranted
+
+ private var _retryCount = 0
+
+ def retryCount = _retryCount
+
+ def incrementRetryCount = {
+ _retryCount += 1
+ _retryCount
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala
index 50b0c6f95b..8d458ac39c 100644
--- a/core/src/main/scala/spark/deploy/master/JobState.scala
+++ b/core/src/main/scala/spark/deploy/master/JobState.scala
@@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED")
type JobState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value
+
+ val MAX_NUM_RETRY = 10
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c98dddea7b..5cc73633ab 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -1,19 +1,18 @@
package spark.deploy.master
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
import akka.actor._
-import spark.{Logging, Utils}
-import spark.util.AkkaUtils
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+
import java.text.SimpleDateFormat
import java.util.Date
-import akka.remote.RemoteClientLifeCycleEvent
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
import spark.deploy._
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
-import spark.deploy.RegisterWorker
-import spark.deploy.RegisterWorkerFailed
-import akka.actor.Terminated
+import spark.{Logging, SparkException, Utils}
+import spark.util.AkkaUtils
+
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
@@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.state = state
exec.job.actor ! ExecutorUpdated(execId, state, message)
if (ExecutorState.isFinished(state)) {
+ val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
logInfo("Removing executor " + exec.fullId + " because it is " + state)
- idToJob(jobId).removeExecutor(exec)
+ jobInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)
- // TODO: the worker would probably want to restart the executor a few times
- schedule()
+
+ // Only retry certain number of times so we don't go into an infinite loop.
+ if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+ schedule()
+ } else {
+ val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+ jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
+ logError(e.getMessage, e)
+ throw e
+ //System.exit(1)
+ }
}
}
case None =>
@@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
-
+
case RequestMasterState => {
sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 7043361020..e2a9df275a 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -75,7 +75,8 @@ class ExecutorRunner(
def buildCommandSeq(): Seq[String] = {
val command = jobDesc.command
- val runScript = new File(sparkHome, "run").getCanonicalPath
+ val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run";
+ val runScript = new File(sparkHome, script).getCanonicalPath
Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables)
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 8f975c52d4..9999b6ba80 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -38,6 +38,10 @@ class Executor extends Logging {
System.setProperty(key, value)
}
+ // Create our ClassLoader and set it on this thread
+ urlClassLoader = createClassLoader()
+ Thread.currentThread.setContextClassLoader(urlClassLoader)
+
// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
SparkEnv.set(env)
@@ -45,11 +49,6 @@ class Executor extends Logging {
// Start worker thread pool
threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
- // Create our ClassLoader and set it on this thread
- urlClassLoader = createClassLoader()
- Thread.currentThread.setContextClassLoader(urlClassLoader)
-
}
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 3bcc588015..745aa0c939 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -44,7 +44,8 @@ object ShuffleMapTask {
}
// Since both the JarSet and FileSet have the same format this is used for both.
- def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
+ def serializeFileSet(
+ set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
val old = cache.get(stageId)
if (old != null) {
return old
@@ -59,7 +60,6 @@ object ShuffleMapTask {
}
}
-
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
synchronized {
val loader = Thread.currentThread.getContextClassLoader
@@ -113,7 +113,8 @@ class ShuffleMapTask(
out.writeInt(bytes.length)
out.write(bytes)
- val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache)
+ val fileSetBytes = ShuffleMapTask.serializeFileSet(
+ fileSet, stageId, ShuffleMapTask.fileSetCache)
out.writeInt(fileSetBytes.length)
out.write(fileSetBytes)
val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache)
@@ -172,7 +173,7 @@ class ShuffleMapTask(
buckets.map(_.iterator)
} else {
// No combiners (no map-side aggregation). Simply partition the map output.
- val buckets = Array.tabulate(numOutputSplits)(_ => new ArrayBuffer[(Any, Any)])
+ val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
for (elem <- rdd.iterator(split)) {
val pair = elem.asInstanceOf[(Any, Any)]
val bucketId = partitioner.getPartition(pair._1)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3a51f6bd96..15748b70d5 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -614,10 +614,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
- /*serializer.newInstance().deserializeMany(bytes)*/
- val ser = serializer.newInstance()
bytes.rewind()
- return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
+ val ser = serializer.newInstance()
+ return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator
}
def stop() {
@@ -632,7 +631,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
object BlockManager {
def getNumParallelFetchesFromSystemProperties(): Int = {
- System.getProperty("spark.blockManager.parallelFetches", "8").toInt
+ System.getProperty("spark.blockManager.parallelFetches", "4").toInt
}
def getMaxMemoryFromSystemProperties(): Long = {
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 09287faba0..d505df66a7 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,6 @@
package spark.storage
-import java.io.{File, RandomAccessFile}
+import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
import java.util.{LinkedHashMap, UUID}
@@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import scala.collection.mutable.ArrayBuffer
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
import spark.{Utils, Logging, Serializer, SizeEstimator}
/**
* Abstract class to store blocks
*/
-abstract class BlockStore(blockManager: BlockManager) extends Logging {
+abstract class BlockStore(val blockManager: BlockManager) extends Logging {
initLogging()
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
@@ -131,7 +133,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return None
}
if (entry.deserialized) {
- return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator)
+ return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
} else {
return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate()))
}
@@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
- if (file != null) {
- val channel = new RandomAccessFile(file, "rw").getChannel()
- val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
- buffer.put(bytes)
- channel.close()
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
- blockId, bytes.limit, (finishTime - startTime)))
- } else {
- logError("File not created for block " + blockId)
- }
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
+ buffer.put(bytes)
+ channel.close()
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
+ blockId, bytes.limit, (finishTime - startTime)))
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
- : Either[Iterator[Any], ByteBuffer] = {
- val bytes = dataSerialize(values)
- logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes")
- putBytes(blockId, bytes, level)
- return Right(bytes)
+ : Either[Iterator[Any], ByteBuffer] = {
+
+ logDebug("Attempting to write values for block " + blockId)
+ val file = createFile(blockId)
+ val fileOut = new FastBufferedOutputStream(new FileOutputStream(file))
+ val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
+ objOut.writeAll(values)
+ objOut.close()
+
+ // Return a byte buffer for the contents of the file
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
}
def getBytes(blockId: String): Option[ByteBuffer] = {
@@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
newFile.getParentFile.mkdirs()
return newFile
} else {
- logError("File for block " + blockId + " already exists on disk, " + file)
- return null
+ throw new Exception("File for block " + blockId + " already exists on disk, " + file)
}
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index b7b8a79327..93b876d205 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -18,7 +18,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
val clusterUrl = "local-cluster[2,1,512]"
- var sc: SparkContext = _
+ @transient var sc: SparkContext = _
after {
if (sc != null) {
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index 500af1eb90..fd7a7bd589 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -3,14 +3,14 @@ package spark
import com.google.common.io.Files
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
-import java.io.{File, PrintWriter}
+import java.io.{File, PrintWriter, FileReader, BufferedReader}
import SparkContext._
class FileServerSuite extends FunSuite with BeforeAndAfter {
- var sc: SparkContext = _
- var tmpFile : File = _
- var testJarFile : File = _
+ @transient var sc: SparkContext = _
+ @transient var tmpFile : File = _
+ @transient var testJarFile : File = _
before {
// Create a sample text file
@@ -38,7 +38,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
- val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
val fileVal = in.readLine().toInt
in.close()
_ * fileVal + _ * fileVal
@@ -53,7 +53,9 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
@@ -66,7 +68,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
- val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
val fileVal = in.readLine().toInt
in.close()
_ * fileVal + _ * fileVal
@@ -75,19 +77,19 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
-
test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
-
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index f622c413f7..9d7e2591f1 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -15,16 +15,16 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
-
+
var sc: SparkContext = _
-
+
after {
if (sc != null) {
sc.stop()
sc = null
}
}
-
+
test("groupByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -57,7 +57,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
-
+
test("groupByKey with many output partitions") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -187,7 +187,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
(4, (ArrayBuffer(), ArrayBuffer('w')))
))
}
-
+
test("zero-partition RDD") {
sc = new SparkContext("local", "test")
val emptyDir = Files.createTempDir()
@@ -195,7 +195,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
assert(file.splits.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
- assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
test("map-side combine") {
@@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
_+_,
_+_,
false)
- val shuffledRdd = new ShuffledRDD(
+ val shuffledRdd = new ShuffledAggregatedRDD(
pairs, aggregator, new HashPartitioner(2))
assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
@@ -220,7 +220,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// not see an exception because mergeCombine should not have been called.
val aggregatorWithException = new Aggregator[Int, Int, Int](
(v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
- val shuffledRdd1 = new ShuffledRDD(
+ val shuffledRdd1 = new ShuffledAggregatedRDD(
pairs, aggregatorWithException, new HashPartitioner(2))
assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
@@ -228,7 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// expect to see an exception thrown.
val aggregatorWithException1 = new Aggregator[Int, Int, Int](
(v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
- val shuffledRdd2 = new ShuffledRDD(
+ val shuffledRdd2 = new ShuffledAggregatedRDD(
pairs, aggregatorWithException1, new HashPartitioner(2))
evaluating { shuffledRdd2.collect() } should produce [SparkException]
}