aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-10-02 22:17:17 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-10-02 22:17:17 -0700
commitff813e43809c3598f7ec46f7f562bc4952cfe54c (patch)
treea19047feb1c46156059a8f9510064d4146b54892
parent288e1c99abb64866c82c7b3cf725cf1c79e5e920 (diff)
parent87f4451f20fb9deee550a439ce0db094370eb2d2 (diff)
downloadspark-ff813e43809c3598f7ec46f7f562bc4952cfe54c.tar.gz
spark-ff813e43809c3598f7ec46f7f562bc4952cfe54c.tar.bz2
spark-ff813e43809c3598f7ec46f7f562bc4952cfe54c.zip
Merge remote-tracking branch 'upstream/dev' into dev
-rw-r--r--README.md6
-rw-r--r--core/src/main/scala/spark/BlockRDD.scala2
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala3
-rw-r--r--core/src/main/scala/spark/BoundedMemoryCache.scala6
-rw-r--r--core/src/main/scala/spark/Cache.scala10
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala25
-rw-r--r--core/src/main/scala/spark/CartesianRDD.scala1
-rw-r--r--core/src/main/scala/spark/ClosureCleaner.scala6
-rw-r--r--core/src/main/scala/spark/CoGroupedRDD.scala9
-rw-r--r--core/src/main/scala/spark/FetchFailedException.scala2
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/HttpFileServer.scala2
-rw-r--r--core/src/main/scala/spark/HttpServer.scala4
-rw-r--r--core/src/main/scala/spark/JavaSerializer.scala8
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala10
-rw-r--r--core/src/main/scala/spark/NewHadoopRDD.scala1
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/spark/ParallelCollection.scala2
-rw-r--r--core/src/main/scala/spark/SampledRDD.scala1
-rw-r--r--core/src/main/scala/spark/Serializer.scala8
-rw-r--r--core/src/main/scala/spark/ShuffleFetcher.scala2
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala2
-rw-r--r--core/src/main/scala/spark/SizeEstimator.scala2
-rw-r--r--core/src/main/scala/spark/SoftReferenceCache.scala2
-rw-r--r--core/src/main/scala/spark/SparkContext.scala85
-rw-r--r--core/src/main/scala/spark/TaskEndReason.scala14
-rw-r--r--core/src/main/scala/spark/TaskState.scala2
-rw-r--r--core/src/main/scala/spark/UnionRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/function/WrappedFunction1.scala2
-rw-r--r--core/src/main/scala/spark/api/java/function/WrappedFunction2.scala2
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala1
-rw-r--r--core/src/main/scala/spark/broadcast/BroadcastFactory.scala2
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/MultiTracker.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/SourceInfo.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala4
-rw-r--r--core/src/main/scala/spark/deploy/Command.scala2
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala29
-rw-r--r--core/src/main/scala/spark/deploy/ExecutorState.scala2
-rw-r--r--core/src/main/scala/spark/deploy/JobDescription.scala2
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala1
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/ClientListener.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/TestExecutor.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ExecutorInfo.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala1
-rw-r--r--core/src/main/scala/spark/deploy/master/JobState.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterArguments.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala1
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerArguments.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala1
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/spark/executor/ExecutorBackend.scala2
-rw-r--r--core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala3
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala4
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala4
-rw-r--r--core/src/main/scala/spark/network/Connection.scala5
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala8
-rw-r--r--core/src/main/scala/spark/network/ConnectionManagerTest.scala2
-rw-r--r--core/src/main/scala/spark/network/Message.scala12
-rw-r--r--core/src/main/scala/spark/network/ReceiverTest.scala2
-rw-r--r--core/src/main/scala/spark/network/SenderTest.scala2
-rw-r--r--core/src/main/scala/spark/partial/ApproximateActionListener.scala2
-rw-r--r--core/src/main/scala/spark/partial/ApproximateEvaluator.scala2
-rw-r--r--core/src/main/scala/spark/partial/BoundedDouble.scala1
-rw-r--r--core/src/main/scala/spark/partial/CountEvaluator.scala2
-rw-r--r--core/src/main/scala/spark/partial/GroupedCountEvaluator.scala2
-rw-r--r--core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala2
-rw-r--r--core/src/main/scala/spark/partial/GroupedSumEvaluator.scala2
-rw-r--r--core/src/main/scala/spark/partial/MeanEvaluator.scala2
-rw-r--r--core/src/main/scala/spark/partial/PartialResult.scala2
-rw-r--r--core/src/main/scala/spark/partial/StudentTCacher.scala2
-rw-r--r--core/src/main/scala/spark/partial/SumEvaluator.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ActiveJob.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/JobListener.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobResult.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/JobWaiter.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/Task.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/TaskResult.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/TaskScheduler.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSet.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala14
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala296
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala13
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerWorker.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala10
-rw-r--r--core/src/main/scala/spark/storage/BlockMessageArray.scala3
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala2
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala18
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala170
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala4
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala2
-rw-r--r--core/src/main/scala/spark/util/IntParam.scala2
-rw-r--r--core/src/main/scala/spark/util/MemoryParam.scala2
-rw-r--r--core/src/main/scala/spark/util/SerializableBuffer.scala1
-rw-r--r--core/src/main/scala/spark/util/StatCounter.scala3
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala22
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala98
-rw-r--r--docs/tuning.md68
-rw-r--r--project/SparkBuild.scala6
125 files changed, 754 insertions, 438 deletions
diff --git a/README.md b/README.md
index bd75005c30..c3695ac800 100644
--- a/README.md
+++ b/README.md
@@ -12,10 +12,8 @@ This README file only contains basic setup instructions.
## Building
-Spark requires Scala 2.9.1. This version has been tested with 2.9.1.final.
-
-The project is built using Simple Build Tool (SBT), which is packaged with it.
-To build Spark and its example programs, run:
+Spark requires Scala 2.9.2. The project is built using Simple Build Tool (SBT),
+which is packaged with it. To build Spark and its example programs, run:
sbt/sbt compile
diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala
index daabc0d566..faa99fe3e9 100644
--- a/core/src/main/scala/spark/BlockRDD.scala
+++ b/core/src/main/scala/spark/BlockRDD.scala
@@ -2,7 +2,7 @@ package spark
import scala.collection.mutable.HashMap
-class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
+private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
val index = idx
}
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 9c42e88b68..fb65ba421a 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -11,8 +11,7 @@ import spark.storage.BlockManagerId
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
-class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
+private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) {
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager
diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala
index 6fe0b94297..e8392a194f 100644
--- a/core/src/main/scala/spark/BoundedMemoryCache.scala
+++ b/core/src/main/scala/spark/BoundedMemoryCache.scala
@@ -9,7 +9,7 @@ import java.util.LinkedHashMap
* some cache entries have pointers to a shared object. Nonetheless, this Cache should work well
* when most of the space is used by arrays of primitives or of simple classes.
*/
-class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
+private[spark] class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
logInfo("BoundedMemoryCache.maxBytes = " + maxBytes)
def this() {
@@ -104,9 +104,9 @@ class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
}
// An entry in our map; stores a cached object and its size in bytes
-case class Entry(value: Any, size: Long)
+private[spark] case class Entry(value: Any, size: Long)
-object BoundedMemoryCache {
+private[spark] object BoundedMemoryCache {
/**
* Get maximum cache capacity from system configuration
*/
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala
index 150fe14e2c..20d677a854 100644
--- a/core/src/main/scala/spark/Cache.scala
+++ b/core/src/main/scala/spark/Cache.scala
@@ -2,9 +2,9 @@ package spark
import java.util.concurrent.atomic.AtomicInteger
-sealed trait CachePutResponse
-case class CachePutSuccess(size: Long) extends CachePutResponse
-case class CachePutFailure() extends CachePutResponse
+private[spark] sealed trait CachePutResponse
+private[spark] case class CachePutSuccess(size: Long) extends CachePutResponse
+private[spark] case class CachePutFailure() extends CachePutResponse
/**
* An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
@@ -22,7 +22,7 @@ case class CachePutFailure() extends CachePutResponse
* This abstract class handles the creation of key spaces, so that subclasses need only deal with
* keys that are unique across modules.
*/
-abstract class Cache {
+private[spark] abstract class Cache {
private val nextKeySpaceId = new AtomicInteger(0)
private def newKeySpaceId() = nextKeySpaceId.getAndIncrement()
@@ -52,7 +52,7 @@ abstract class Cache {
/**
* A key namespace in a Cache.
*/
-class KeySpace(cache: Cache, val keySpaceId: Int) {
+private[spark] class KeySpace(cache: Cache, val keySpaceId: Int) {
def get(datasetId: Any, partition: Int): Any =
cache.get((keySpaceId, datasetId), partition)
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index d9e0ef90b8..9a23f9e7cc 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -15,19 +15,20 @@ import scala.collection.mutable.HashSet
import spark.storage.BlockManager
import spark.storage.StorageLevel
-sealed trait CacheTrackerMessage
-case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
+private[spark] sealed trait CacheTrackerMessage
+
+private[spark] case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
extends CacheTrackerMessage
-case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
+private[spark] case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
extends CacheTrackerMessage
-case class MemoryCacheLost(host: String) extends CacheTrackerMessage
-case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage
-case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage
-case object GetCacheStatus extends CacheTrackerMessage
-case object GetCacheLocations extends CacheTrackerMessage
-case object StopCacheTracker extends CacheTrackerMessage
-
-class CacheTrackerActor extends Actor with Logging {
+private[spark] case class MemoryCacheLost(host: String) extends CacheTrackerMessage
+private[spark] case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage
+private[spark] case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage
+private[spark] case object GetCacheStatus extends CacheTrackerMessage
+private[spark] case object GetCacheLocations extends CacheTrackerMessage
+private[spark] case object StopCacheTracker extends CacheTrackerMessage
+
+private[spark] class CacheTrackerActor extends Actor with Logging {
// TODO: Should probably store (String, CacheType) tuples
private val locs = new HashMap[Int, Array[List[String]]]
@@ -89,7 +90,7 @@ class CacheTrackerActor extends Actor with Logging {
}
}
-class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager)
+private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager)
extends Logging {
// Tracker actor on the master, or remote reference to it on workers
diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala
index e26041555a..83db2d2934 100644
--- a/core/src/main/scala/spark/CartesianRDD.scala
+++ b/core/src/main/scala/spark/CartesianRDD.scala
@@ -1,5 +1,6 @@
package spark
+private[spark]
class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
override val index: Int = idx
}
diff --git a/core/src/main/scala/spark/ClosureCleaner.scala b/core/src/main/scala/spark/ClosureCleaner.scala
index 3b83d23a13..98525b99c8 100644
--- a/core/src/main/scala/spark/ClosureCleaner.scala
+++ b/core/src/main/scala/spark/ClosureCleaner.scala
@@ -9,7 +9,7 @@ import org.objectweb.asm.{ClassReader, MethodVisitor, Type}
import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.Opcodes._
-object ClosureCleaner extends Logging {
+private[spark] object ClosureCleaner extends Logging {
// Get an ASM class reader for a given class from the JAR that loaded it
private def getClassReader(cls: Class[_]): ClassReader = {
new ClassReader(cls.getResourceAsStream(
@@ -154,7 +154,7 @@ object ClosureCleaner extends Logging {
}
}
-class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor {
+private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
return new EmptyVisitor {
@@ -180,7 +180,7 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor
}
}
-class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor {
+private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor {
var myName: String = null
override def visit(version: Int, access: Int, name: String, sig: String,
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala
index 6959917d14..daba719b14 100644
--- a/core/src/main/scala/spark/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/CoGroupedRDD.scala
@@ -6,16 +6,17 @@ import java.io.ObjectInputStream
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-sealed trait CoGroupSplitDep extends Serializable
-case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
-case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
+private[spark] sealed trait CoGroupSplitDep extends Serializable
+private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
+private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
+private[spark]
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
}
-class CoGroupAggregator
+private[spark] class CoGroupAggregator
extends Aggregator[Any, Any, ArrayBuffer[Any]](
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
diff --git a/core/src/main/scala/spark/FetchFailedException.scala b/core/src/main/scala/spark/FetchFailedException.scala
index 55512f4481..a953081d24 100644
--- a/core/src/main/scala/spark/FetchFailedException.scala
+++ b/core/src/main/scala/spark/FetchFailedException.scala
@@ -2,7 +2,7 @@ package spark
import spark.storage.BlockManagerId
-class FetchFailedException(
+private[spark] class FetchFailedException(
val bmAddress: BlockManagerId,
val shuffleId: Int,
val mapId: Int,
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index 0befca582d..6d448116a9 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -18,7 +18,7 @@ import org.apache.hadoop.util.ReflectionUtils
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
-class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
+private[spark] class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split
with Serializable {
diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala
index 05ca846c85..659d17718f 100644
--- a/core/src/main/scala/spark/HttpFileServer.scala
+++ b/core/src/main/scala/spark/HttpFileServer.scala
@@ -5,7 +5,7 @@ import java.net.URL
import scala.collection.mutable.HashMap
import org.apache.hadoop.fs.FileUtil
-class HttpFileServer extends Logging {
+private[spark] class HttpFileServer extends Logging {
var baseDir : File = null
var fileDir : File = null
diff --git a/core/src/main/scala/spark/HttpServer.scala b/core/src/main/scala/spark/HttpServer.scala
index 855f2c752f..0196595ba1 100644
--- a/core/src/main/scala/spark/HttpServer.scala
+++ b/core/src/main/scala/spark/HttpServer.scala
@@ -12,14 +12,14 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool
/**
* Exception type thrown by HttpServer when it is in the wrong state for an operation.
*/
-class ServerStateException(message: String) extends Exception(message)
+private[spark] class ServerStateException(message: String) extends Exception(message)
/**
* An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
-class HttpServer(resourceBase: File) extends Logging {
+private[spark] class HttpServer(resourceBase: File) extends Logging {
private var server: Server = null
private var port: Int = -1
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
index 1511c2620e..39d554b6a5 100644
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -5,14 +5,14 @@ import java.nio.ByteBuffer
import spark.util.ByteBufferInputStream
-class JavaSerializationStream(out: OutputStream) extends SerializationStream {
+private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
def flush() { objOut.flush() }
def close() { objOut.close() }
}
-class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
+private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
extends DeserializationStream {
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
@@ -23,7 +23,7 @@ extends DeserializationStream {
def close() { objIn.close() }
}
-class JavaSerializerInstance extends SerializerInstance {
+private[spark] class JavaSerializerInstance extends SerializerInstance {
def serialize[T](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
@@ -57,6 +57,6 @@ class JavaSerializerInstance extends SerializerInstance {
}
}
-class JavaSerializer extends Serializer {
+private[spark] class JavaSerializer extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
}
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 376fcff4c8..b8aa3a86c5 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -20,7 +20,7 @@ import spark.storage._
* Zig-zag encoder used to write object sizes to serialization streams.
* Based on Kryo's integer encoder.
*/
-object ZigZag {
+private[spark] object ZigZag {
def writeInt(n: Int, out: OutputStream) {
var value = n
if ((value & ~0x7F) == 0) {
@@ -68,6 +68,7 @@ object ZigZag {
}
}
+private[spark]
class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream)
extends SerializationStream {
val channel = Channels.newChannel(out)
@@ -85,6 +86,7 @@ extends SerializationStream {
def close() { out.close() }
}
+private[spark]
class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream)
extends DeserializationStream {
def readObject[T](): T = {
@@ -95,7 +97,7 @@ extends DeserializationStream {
def close() { in.close() }
}
-class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
+private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
val kryo = ks.kryo
val threadBuffer = ks.threadBuffer.get()
val objectBuffer = ks.objectBuffer.get()
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 82c1391345..116d526854 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -16,11 +16,11 @@ import scala.collection.mutable.HashSet
import spark.storage.BlockManagerId
-sealed trait MapOutputTrackerMessage
-case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
-case object StopMapOutputTracker extends MapOutputTrackerMessage
+private[spark] sealed trait MapOutputTrackerMessage
+private[spark] case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
+private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
-class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
+private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
def receive = {
case GetMapOutputLocations(shuffleId: Int) =>
logInfo("Asked to get map output locations for shuffle " + shuffleId)
@@ -33,7 +33,7 @@ class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Loggin
}
}
-class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging {
+private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging {
val ip: String = System.getProperty("spark.master.host", "localhost")
val port: Int = System.getProperty("spark.master.port", "7077").toInt
val actorName: String = "MapOutputTracker"
diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala
index 14f708a3f8..9072698357 100644
--- a/core/src/main/scala/spark/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/NewHadoopRDD.scala
@@ -13,6 +13,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID
import java.util.Date
import java.text.SimpleDateFormat
+private[spark]
class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
extends Split {
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 4752bf8d9f..80d62caf25 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -459,6 +459,6 @@ class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]
}
}
-object Manifests {
+private[spark] object Manifests {
val seqSeqManifest = classManifest[Seq[Seq[_]]]
}
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index d79007ab40..321f5264b8 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -3,7 +3,7 @@ package spark
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
-class ParallelCollectionSplit[T: ClassManifest](
+private[spark] class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long,
val slice: Int,
values: Seq[T])
diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala
index c066017e89..ac10aed477 100644
--- a/core/src/main/scala/spark/SampledRDD.scala
+++ b/core/src/main/scala/spark/SampledRDD.scala
@@ -4,6 +4,7 @@ import java.util.Random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
+private[spark]
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index: Int = prev.index
}
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala
index 9ec07cc173..c0e08289d8 100644
--- a/core/src/main/scala/spark/Serializer.scala
+++ b/core/src/main/scala/spark/Serializer.scala
@@ -12,14 +12,14 @@ import spark.util.ByteBufferInputStream
* A serializer. Because some serialization libraries are not thread safe, this class is used to
* create SerializerInstances that do the actual serialization.
*/
-trait Serializer {
+private[spark] trait Serializer {
def newInstance(): SerializerInstance
}
/**
* An instance of the serializer, for use by one thread at a time.
*/
-trait SerializerInstance {
+private[spark] trait SerializerInstance {
def serialize[T](t: T): ByteBuffer
def deserialize[T](bytes: ByteBuffer): T
@@ -50,7 +50,7 @@ trait SerializerInstance {
/**
* A stream for writing serialized objects.
*/
-trait SerializationStream {
+private[spark] trait SerializationStream {
def writeObject[T](t: T): SerializationStream
def flush(): Unit
def close(): Unit
@@ -66,7 +66,7 @@ trait SerializationStream {
/**
* A stream for reading serialized objects.
*/
-trait DeserializationStream {
+private[spark] trait DeserializationStream {
def readObject[T](): T
def close(): Unit
diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala
index 4f8d98f7d0..daa35fe7f2 100644
--- a/core/src/main/scala/spark/ShuffleFetcher.scala
+++ b/core/src/main/scala/spark/ShuffleFetcher.scala
@@ -1,6 +1,6 @@
package spark
-abstract class ShuffleFetcher {
+private[spark] abstract class ShuffleFetcher {
// Fetch the shuffle outputs for a given ShuffleDependency, calling func exactly
// once on each key-value pair obtained.
def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit)
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 11d5c0ede8..1a9f4cfec3 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -4,7 +4,7 @@ import scala.collection.mutable.ArrayBuffer
import java.util.{HashMap => JHashMap}
-class ShuffledRDDSplit(val idx: Int) extends Split {
+private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
override def hashCode(): Int = idx
}
diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala
index 6a71522922..7c3e8640e9 100644
--- a/core/src/main/scala/spark/SizeEstimator.scala
+++ b/core/src/main/scala/spark/SizeEstimator.scala
@@ -22,7 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet
* Based on the following JavaWorld article:
* http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
*/
-object SizeEstimator extends Logging {
+private[spark] object SizeEstimator extends Logging {
// Sizes of primitive types
private val BYTE_SIZE = 1
diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/SoftReferenceCache.scala
index ce9370c5d7..3dd0a4b1f9 100644
--- a/core/src/main/scala/spark/SoftReferenceCache.scala
+++ b/core/src/main/scala/spark/SoftReferenceCache.scala
@@ -5,7 +5,7 @@ import com.google.common.collect.MapMaker
/**
* An implementation of Cache that uses soft references.
*/
-class SoftReferenceCache extends Cache {
+private[spark] class SoftReferenceCache extends Cache {
val map = new MapMaker().softValues().makeMap[Any, Any]()
override def get(datasetId: Any, partition: Int): Any =
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 79a9e8e34e..83c1b49203 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -55,7 +55,7 @@ class SparkContext(
val sparkHome: String,
val jars: Seq[String])
extends Logging {
-
+
def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil)
// Ensure logging is initialized before we spawn any threads
@@ -78,30 +78,30 @@ class SparkContext(
true,
isLocal)
SparkEnv.set(env)
-
+
// Used to store a URL for each static file/jar together with the file's local timestamp
val addedFiles = HashMap[String, Long]()
val addedJars = HashMap[String, Long]()
-
+
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
-
+
// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
- val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
+ val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
- val LOCAL_CLUSTER_REGEX = """local-cluster\[([0-9]+),([0-9]+),([0-9]+)]""".r
+ val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r
-
+
master match {
- case "local" =>
+ case "local" =>
new LocalScheduler(1, 0, this)
- case LOCAL_N_REGEX(threads) =>
+ case LOCAL_N_REGEX(threads) =>
new LocalScheduler(threads.toInt, 0, this)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
@@ -112,10 +112,21 @@ class SparkContext(
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
-
- case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
+
+ case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
+ // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
+ val memoryPerSlaveInt = memoryPerSlave.toInt
+ val sparkMemEnv = System.getenv("SPARK_MEM")
+ val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
+ if (sparkMemEnvInt > memoryPerSlaveInt) {
+ throw new SparkException(
+ "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
+ memoryPerSlaveInt, sparkMemEnvInt))
+ }
+
val scheduler = new ClusterScheduler(this)
- val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
+ val localCluster = new LocalSparkCluster(
+ numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
@@ -140,13 +151,13 @@ class SparkContext(
taskScheduler.start()
private var dagScheduler = new DAGScheduler(taskScheduler)
-
+
// Methods for creating RDDs
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = {
new ParallelCollection[T](this, seq, numSlices)
}
-
+
def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = {
parallelize(seq, numSlices)
}
@@ -187,14 +198,14 @@ class SparkContext(
}
/**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
+ * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
: RDD[(K, V)] = {
hadoopFile(path,
- fm.erasure.asInstanceOf[Class[F]],
+ fm.erasure.asInstanceOf[Class[F]],
km.erasure.asInstanceOf[Class[K]],
vm.erasure.asInstanceOf[Class[V]],
minSplits)
@@ -215,7 +226,7 @@ class SparkContext(
new Configuration)
}
- /**
+ /**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
@@ -231,7 +242,7 @@ class SparkContext(
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
}
- /**
+ /**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
@@ -257,14 +268,14 @@ class SparkContext(
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
/**
- * Version of sequenceFile() for types implicitly convertible to Writables through a
+ * Version of sequenceFile() for types implicitly convertible to Writables through a
* WritableConverter.
*
* WritableConverters are provided in a somewhat strange way (by an implicit function) to support
- * both subclasses of Writable and types for which we define a converter (e.g. Int to
+ * both subclasses of Writable and types for which we define a converter (e.g. Int to
* IntWritable). The most natural thing would've been to have implicit objects for the
* converters, but then we couldn't have an object for every subclass of Writable (you can't
- * have a parameterized singleton object). We use functions instead to create a new converter
+ * have a parameterized singleton object). We use functions instead to create a new converter
* for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
@@ -289,7 +300,7 @@ class SparkContext(
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T: ClassManifest](
- path: String,
+ path: String,
minSplits: Int = defaultMinSplits
): RDD[T] = {
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
@@ -318,7 +329,7 @@ class SparkContext(
/**
* Create an accumulator from a "mutable collection" type.
- *
+ *
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
@@ -329,7 +340,7 @@ class SparkContext(
// Keep around a weak hash map of values to Cached versions?
def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal)
-
+
// Adds a file dependency to all Tasks executed in the future.
def addFile(path: String) {
val uri = new URI(path)
@@ -338,11 +349,11 @@ class SparkContext(
case _ => path
}
addedFiles(key) = System.currentTimeMillis
-
+
// Fetch the file locally in case the task is executed locally
val filename = new File(path.split("/").last)
Utils.fetchFile(path, new File("."))
-
+
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
}
@@ -350,7 +361,7 @@ class SparkContext(
addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedFiles.clear()
}
-
+
// Adds a jar dependency to all Tasks executed in the future.
def addJar(path: String) {
val uri = new URI(path)
@@ -366,7 +377,7 @@ class SparkContext(
addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedJars.clear()
}
-
+
// Stop the SparkContext
def stop() {
dagScheduler.stop()
@@ -400,7 +411,7 @@ class SparkContext(
/**
* Run a function on a given set of partitions in an RDD and return the results. This is the main
* entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies
- * whether the scheduler can run the computation on the master rather than shipping it out to the
+ * whether the scheduler can run the computation on the master rather than shipping it out to the
* cluster, for short actions like first().
*/
def runJob[T, U: ClassManifest](
@@ -419,13 +430,13 @@ class SparkContext(
def runJob[T, U: ClassManifest](
rdd: RDD[T],
- func: Iterator[T] => U,
+ func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
}
-
+
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
@@ -472,7 +483,7 @@ class SparkContext(
private[spark] def newShuffleId(): Int = {
nextShuffleId.getAndIncrement()
}
-
+
private var nextRddId = new AtomicInteger(0)
// Register a new RDD, returning its RDD ID
@@ -500,7 +511,7 @@ object SparkContext {
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
-
+
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
@@ -521,7 +532,7 @@ object SparkContext {
implicit def longToLongWritable(l: Long) = new LongWritable(l)
implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
-
+
implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
@@ -532,7 +543,7 @@ object SparkContext {
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
-
+
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
}
@@ -576,7 +587,7 @@ object SparkContext {
Nil
}
}
-
+
// Find the JAR that contains the class of a particular object
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
}
@@ -589,7 +600,7 @@ object SparkContext {
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
-class WritableConverter[T](
+private[spark] class WritableConverter[T](
val writableClass: ClassManifest[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index 6e4eb25ed4..420c54bc9a 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -7,10 +7,16 @@ import spark.storage.BlockManagerId
* tasks several times for "ephemeral" failures, and only report back failures that require some
* old stages to be resubmitted, such as shuffle map fetch failures.
*/
-sealed trait TaskEndReason
+private[spark] sealed trait TaskEndReason
-case object Success extends TaskEndReason
+private[spark] case object Success extends TaskEndReason
+
+private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
+
+private[spark]
case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
-case class ExceptionFailure(exception: Throwable) extends TaskEndReason
-case class OtherFailure(message: String) extends TaskEndReason
+
+private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
+
+private[spark] case class OtherFailure(message: String) extends TaskEndReason
diff --git a/core/src/main/scala/spark/TaskState.scala b/core/src/main/scala/spark/TaskState.scala
index 9566b52432..78eb33a628 100644
--- a/core/src/main/scala/spark/TaskState.scala
+++ b/core/src/main/scala/spark/TaskState.scala
@@ -2,7 +2,7 @@ package spark
import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-object TaskState
+private[spark] object TaskState
extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED", "KILLED", "LOST") {
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala
index 0e8164d6ab..3e795ea2a2 100644
--- a/core/src/main/scala/spark/UnionRDD.scala
+++ b/core/src/main/scala/spark/UnionRDD.scala
@@ -2,7 +2,7 @@ package spark
import scala.collection.mutable.ArrayBuffer
-class UnionSplit[T: ClassManifest](
+private[spark] class UnionSplit[T: ClassManifest](
idx: Int,
rdd: RDD[T],
split: Split)
diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
index d08e1e9fbf..923f5cdf4f 100644
--- a/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
+++ b/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
@@ -7,7 +7,7 @@ import scala.runtime.AbstractFunction1
* apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
* isn't marked to allow that).
*/
-abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
+private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
@throws(classOf[Exception])
def call(t: T): R
diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
index c9d67d9771..2c6e9b1571 100644
--- a/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
+++ b/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
@@ -7,7 +7,7 @@ import scala.runtime.AbstractFunction2
* apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
* isn't marked to allow that).
*/
-abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
+private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
@throws(classOf[Exception])
def call(t1: T1, t2: T2): R
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index b72e8986d3..cf20f456c4 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -11,7 +11,7 @@ import scala.math
import spark._
import spark.storage.StorageLevel
-class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
+private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id)
with Logging
with Serializable {
@@ -1027,7 +1027,7 @@ class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Lo
}
}
-class BitTorrentBroadcastFactory
+private[spark] class BitTorrentBroadcastFactory
extends BroadcastFactory {
def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) }
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index 3ba91c93e9..6055bfd045 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -14,6 +14,7 @@ abstract class Broadcast[T](id: Long) extends Serializable {
override def toString = "spark.Broadcast(" + id + ")"
}
+private[spark]
class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {
private var initialized = false
diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
index 66ca8d56d5..ab6d302827 100644
--- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
@@ -6,7 +6,7 @@ package spark.broadcast
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
-trait BroadcastFactory {
+private[spark] trait BroadcastFactory {
def initialize(isMaster: Boolean): Unit
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index d8cf5e37d4..7eb4ddb74f 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -12,7 +12,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark._
import spark.storage.StorageLevel
-class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
+private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
def value = value_
@@ -46,7 +46,7 @@ extends Broadcast[T](id) with Logging with Serializable {
}
}
-class HttpBroadcastFactory extends BroadcastFactory {
+private[spark] class HttpBroadcastFactory extends BroadcastFactory {
def initialize(isMaster: Boolean) { HttpBroadcast.initialize(isMaster) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala
index d00db23362..dd8e6dd246 100644
--- a/core/src/main/scala/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala
@@ -382,10 +382,10 @@ extends Logging {
}
}
-case class BroadcastBlock(blockID: Int, byteArray: Array[Byte])
+private[spark] case class BroadcastBlock(blockID: Int, byteArray: Array[Byte])
extends Serializable
-case class VariableInfo(@transient arrayOfBlocks : Array[BroadcastBlock],
+private[spark] case class VariableInfo(@transient arrayOfBlocks : Array[BroadcastBlock],
totalBlocks: Int,
totalBytes: Int)
extends Serializable {
diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala
index f90385fd47..705dd6fd81 100644
--- a/core/src/main/scala/spark/broadcast/SourceInfo.scala
+++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala
@@ -7,7 +7,7 @@ import spark._
/**
* Used to keep and pass around information of peers involved in a broadcast
*/
-case class SourceInfo (hostAddress: String,
+private[spark] case class SourceInfo (hostAddress: String,
listenPort: Int,
totalBlocks: Int = SourceInfo.UnusedParam,
totalBytes: Int = SourceInfo.UnusedParam)
@@ -26,7 +26,7 @@ extends Comparable[SourceInfo] with Logging {
/**
* Helper Object of SourceInfo for its constants
*/
-object SourceInfo {
+private[spark] object SourceInfo {
// Constants for special values of listenPort
val TxNotStartedRetry = -1
val TxOverGoToDefault = 0
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index c1148b22ca..5bd40a40e3 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -10,7 +10,7 @@ import scala.math
import spark._
import spark.storage.StorageLevel
-class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
+private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
def value = value_
@@ -572,7 +572,7 @@ extends Broadcast[T](id) with Logging with Serializable {
}
}
-class TreeBroadcastFactory
+private[spark] class TreeBroadcastFactory
extends BroadcastFactory {
def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) }
diff --git a/core/src/main/scala/spark/deploy/Command.scala b/core/src/main/scala/spark/deploy/Command.scala
index 344888919a..577101e3c3 100644
--- a/core/src/main/scala/spark/deploy/Command.scala
+++ b/core/src/main/scala/spark/deploy/Command.scala
@@ -2,7 +2,7 @@ package spark.deploy
import scala.collection.Map
-case class Command(
+private[spark] case class Command(
mainClass: String,
arguments: Seq[String],
environment: Map[String, String]) {
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 141bbe4d57..d2b63d6e0d 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -7,13 +7,15 @@ import scala.collection.immutable.List
import scala.collection.mutable.HashMap
-sealed trait DeployMessage extends Serializable
+private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master
+private[spark]
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
extends DeployMessage
+private[spark]
case class ExecutorStateChanged(
jobId: String,
execId: Int,
@@ -23,11 +25,11 @@ case class ExecutorStateChanged(
// Master to Worker
-case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-case class RegisterWorkerFailed(message: String) extends DeployMessage
-case class KillExecutor(jobId: String, execId: Int) extends DeployMessage
+private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
+private[spark] case class RegisterWorkerFailed(message: String) extends DeployMessage
+private[spark] case class KillExecutor(jobId: String, execId: Int) extends DeployMessage
-case class LaunchExecutor(
+private[spark] case class LaunchExecutor(
jobId: String,
execId: Int,
jobDesc: JobDescription,
@@ -38,33 +40,42 @@ case class LaunchExecutor(
// Client to Master
-case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
+private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
// Master to Client
+private[spark]
case class RegisteredJob(jobId: String) extends DeployMessage
+
+private[spark]
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
+
+private[spark]
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
+
+private[spark]
case class JobKilled(message: String)
// Internal message in Client
-case object StopClient
+private[spark] case object StopClient
// MasterWebUI To Master
-case object RequestMasterState
+private[spark] case object RequestMasterState
// Master to MasterWebUI
+private[spark]
case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
completedJobs: List[JobInfo])
// WorkerWebUI to Worker
-case object RequestWorkerState
+private[spark] case object RequestWorkerState
// Worker to WorkerWebUI
+private[spark]
case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/ExecutorState.scala b/core/src/main/scala/spark/deploy/ExecutorState.scala
index d6ff1c54ca..5dc0c54552 100644
--- a/core/src/main/scala/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/spark/deploy/ExecutorState.scala
@@ -1,6 +1,6 @@
package spark.deploy
-object ExecutorState
+private[spark] object ExecutorState
extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
diff --git a/core/src/main/scala/spark/deploy/JobDescription.scala b/core/src/main/scala/spark/deploy/JobDescription.scala
index 8ae77b1038..20879c5f11 100644
--- a/core/src/main/scala/spark/deploy/JobDescription.scala
+++ b/core/src/main/scala/spark/deploy/JobDescription.scala
@@ -1,6 +1,6 @@
package spark.deploy
-class JobDescription(
+private[spark] class JobDescription(
val name: String,
val cores: Int,
val memoryPerSlave: Int,
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 1591bfdeb6..8b2a71add5 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -9,6 +9,7 @@ import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
+private[spark]
class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
val localIpAddress = Utils.localIpAddress
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index a2f88fc5e5..b1b72a3a1f 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -16,7 +16,7 @@ import akka.dispatch.Await
* The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description,
* and a listener for job events, and calls back the listener when various events occur.
*/
-class Client(
+private[spark] class Client(
actorSystem: ActorSystem,
masterUrl: String,
jobDescription: JobDescription,
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
index 7d23baff32..a8fa982085 100644
--- a/core/src/main/scala/spark/deploy/client/ClientListener.scala
+++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala
@@ -7,7 +7,7 @@ package spark.deploy.client
*
* Users of this API should *not* block inside the callback methods.
*/
-trait ClientListener {
+private[spark] trait ClientListener {
def connected(jobId: String): Unit
def disconnected(): Unit
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
index df9a36c7fe..bf0e7428ba 100644
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/spark/deploy/client/TestClient.scala
@@ -4,7 +4,7 @@ import spark.util.AkkaUtils
import spark.{Logging, Utils}
import spark.deploy.{Command, JobDescription}
-object TestClient {
+private[spark] object TestClient {
class TestListener extends ClientListener with Logging {
def connected(id: String) {
diff --git a/core/src/main/scala/spark/deploy/client/TestExecutor.scala b/core/src/main/scala/spark/deploy/client/TestExecutor.scala
index 2e40e10d18..0e46db2272 100644
--- a/core/src/main/scala/spark/deploy/client/TestExecutor.scala
+++ b/core/src/main/scala/spark/deploy/client/TestExecutor.scala
@@ -1,6 +1,6 @@
package spark.deploy.client
-object TestExecutor {
+private[spark] object TestExecutor {
def main(args: Array[String]) {
println("Hello world!")
while (true) {
diff --git a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
index 335e00958c..1db2c32633 100644
--- a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
@@ -2,7 +2,7 @@ package spark.deploy.master
import spark.deploy.ExecutorState
-class ExecutorInfo(
+private[spark] class ExecutorInfo(
val id: Int,
val job: JobInfo,
val worker: WorkerInfo,
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 4c81a1b447..8795c09cc1 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -5,6 +5,7 @@ import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
+private[spark]
class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala
index 8d458ac39c..2b70cf0191 100644
--- a/core/src/main/scala/spark/deploy/master/JobState.scala
+++ b/core/src/main/scala/spark/deploy/master/JobState.scala
@@ -1,6 +1,6 @@
package spark.deploy.master
-object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
+private[spark] object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
type JobState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 5cc73633ab..6010f7cff2 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -14,7 +14,7 @@ import spark.{Logging, SparkException, Utils}
import spark.util.AkkaUtils
-class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
+private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
var nextJobNumber = 0
@@ -212,7 +212,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
}
}
-object Master {
+private[spark] object Master {
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
diff --git a/core/src/main/scala/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
index d712e3d5b3..1b1c3dd0ad 100644
--- a/core/src/main/scala/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
@@ -6,7 +6,7 @@ import spark.Utils
/**
* Command-line parser for the master.
*/
-class MasterArguments(args: Array[String]) {
+private[spark] class MasterArguments(args: Array[String]) {
var ip = Utils.localIpAddress()
var port = 7077
var webUiPort = 8080
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 9f9994e4ba..700a41c770 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -10,6 +10,7 @@ import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy._
+private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 59474a0945..16b3f9b653 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -3,7 +3,7 @@ package spark.deploy.master
import akka.actor.ActorRef
import scala.collection.mutable
-class WorkerInfo(
+private[spark] class WorkerInfo(
val id: String,
val host: String,
val port: Int,
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index e2a9df275a..73722a82e0 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -13,7 +13,7 @@ import spark.deploy.ExecutorStateChanged
/**
* Manages the execution of one executor process.
*/
-class ExecutorRunner(
+private[spark] class ExecutorRunner(
val jobId: String,
val execId: Int,
val jobDesc: JobDescription,
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 645613257d..474c9364fd 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -16,7 +16,7 @@ import spark.deploy.RegisterWorkerFailed
import akka.actor.Terminated
import java.io.File
-class Worker(
+private[spark] class Worker(
ip: String,
port: Int,
webUiPort: Int,
@@ -170,7 +170,7 @@ class Worker(
}
}
-object Worker {
+private[spark] object Worker {
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
index 059c40da9f..60dc107a4c 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
@@ -8,7 +8,7 @@ import java.lang.management.ManagementFactory
/**
* Command-line parser for the master.
*/
-class WorkerArguments(args: Array[String]) {
+private[spark] class WorkerArguments(args: Array[String]) {
var ip = Utils.localIpAddress()
var port = 0
var webUiPort = 8081
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f84b92b63e..d06f4884ee 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -9,6 +9,7 @@ import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
+private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 820428c727..6ecf9fa8da 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -16,7 +16,7 @@ import java.nio.ByteBuffer
/**
* The Mesos executor for Spark.
*/
-class Executor extends Logging {
+private[spark] class Executor extends Logging {
var urlClassLoader : ExecutorURLClassLoader = null
var threadPool: ExecutorService = null
var env: SparkEnv = null
diff --git a/core/src/main/scala/spark/executor/ExecutorBackend.scala b/core/src/main/scala/spark/executor/ExecutorBackend.scala
index 24c8776f31..e97e509700 100644
--- a/core/src/main/scala/spark/executor/ExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/ExecutorBackend.scala
@@ -6,6 +6,6 @@ import spark.TaskState.TaskState
/**
* A pluggable interface used by the Executor to send updates to the cluster scheduler.
*/
-trait ExecutorBackend {
+private[spark] trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
}
diff --git a/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala
index f74f036c4c..5beb4d049e 100644
--- a/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala
+++ b/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala
@@ -5,8 +5,7 @@ import java.net.{URLClassLoader, URL}
/**
* The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
*/
-private[spark]
-class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
+private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends URLClassLoader(urls, parent) {
override def addURL(url: URL) {
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 50f4e41ede..eeab3959c6 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -8,7 +8,7 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
-class MesosExecutorBackend(executor: Executor)
+private[spark] class MesosExecutorBackend(executor: Executor)
extends MesosExecutor
with ExecutorBackend
with Logging {
@@ -59,7 +59,7 @@ class MesosExecutorBackend(executor: Executor)
/**
* Entry point for Mesos executor.
*/
-object MesosExecutorBackend {
+private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 26b163de0a..915f71ba9f 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -14,7 +14,7 @@ import spark.scheduler.cluster.RegisterSlaveFailed
import spark.scheduler.cluster.RegisterSlave
-class StandaloneExecutorBackend(
+private[spark] class StandaloneExecutorBackend(
executor: Executor,
masterUrl: String,
slaveId: String,
@@ -62,7 +62,7 @@ class StandaloneExecutorBackend(
}
}
-object StandaloneExecutorBackend {
+private[spark] object StandaloneExecutorBackend {
def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index c4350173fc..80262ab7b4 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -11,6 +11,7 @@ import java.nio.channels.spi._
import java.net._
+private[spark]
abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
channel.configureBlocking(false)
@@ -102,7 +103,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
}
-class SendingConnection(val address: InetSocketAddress, selector_ : Selector)
+private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector)
extends Connection(SocketChannel.open, selector_) {
class Outbox(fair: Int = 0) {
@@ -259,7 +260,7 @@ extends Connection(SocketChannel.open, selector_) {
}
-class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector)
+private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector)
extends Connection(channel_, selector_) {
class Inbox() {
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 2bb5f5fc6b..dec0df25b4 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -18,17 +18,17 @@ import akka.dispatch.{Await, Promise, ExecutionContext, Future}
import akka.util.Duration
import akka.util.duration._
-case class ConnectionManagerId(host: String, port: Int) {
+private[spark] case class ConnectionManagerId(host: String, port: Int) {
def toSocketAddress() = new InetSocketAddress(host, port)
}
-object ConnectionManagerId {
+private[spark] object ConnectionManagerId {
def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = {
new ConnectionManagerId(socketAddress.getHostName(), socketAddress.getPort())
}
}
-class ConnectionManager(port: Int) extends Logging {
+private[spark] class ConnectionManager(port: Int) extends Logging {
class MessageStatus(
val message: Message,
@@ -349,7 +349,7 @@ class ConnectionManager(port: Int) extends Logging {
}
-object ConnectionManager {
+private[spark] object ConnectionManager {
def main(args: Array[String]) {
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 555b3454ee..47ceaf3c07 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -11,7 +11,7 @@ import java.net.InetAddress
import akka.dispatch.Await
import akka.util.duration._
-object ConnectionManagerTest extends Logging{
+private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: ConnectionManagerTest <mesos cluster> <slaves file>")
diff --git a/core/src/main/scala/spark/network/Message.scala b/core/src/main/scala/spark/network/Message.scala
index 62a06d95c5..525751b5bf 100644
--- a/core/src/main/scala/spark/network/Message.scala
+++ b/core/src/main/scala/spark/network/Message.scala
@@ -9,7 +9,7 @@ import java.net.InetAddress
import java.net.InetSocketAddress
import storage.BlockManager
-class MessageChunkHeader(
+private[spark] class MessageChunkHeader(
val typ: Long,
val id: Int,
val totalSize: Int,
@@ -37,7 +37,7 @@ class MessageChunkHeader(
" and sizes " + totalSize + " / " + chunkSize + " bytes"
}
-class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
+private[spark] class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
val size = if (buffer == null) 0 else buffer.remaining
lazy val buffers = {
val ab = new ArrayBuffer[ByteBuffer]()
@@ -51,7 +51,7 @@ class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
override def toString = "" + this.getClass.getSimpleName + " (id = " + header.id + ", size = " + size + ")"
}
-abstract class Message(val typ: Long, val id: Int) {
+private[spark] abstract class Message(val typ: Long, val id: Int) {
var senderAddress: InetSocketAddress = null
var started = false
var startTime = -1L
@@ -68,7 +68,7 @@ abstract class Message(val typ: Long, val id: Int) {
override def toString = this.getClass.getSimpleName + "(id = " + id + ", size = " + size + ")"
}
-class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
+private[spark] class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
extends Message(Message.BUFFER_MESSAGE, id_) {
val initialSize = currentSize()
@@ -152,7 +152,7 @@ extends Message(Message.BUFFER_MESSAGE, id_) {
}
}
-object MessageChunkHeader {
+private[spark] object MessageChunkHeader {
val HEADER_SIZE = 40
def create(buffer: ByteBuffer): MessageChunkHeader = {
@@ -173,7 +173,7 @@ object MessageChunkHeader {
}
}
-object Message {
+private[spark] object Message {
val BUFFER_MESSAGE = 1111111111L
var lastId = 1
diff --git a/core/src/main/scala/spark/network/ReceiverTest.scala b/core/src/main/scala/spark/network/ReceiverTest.scala
index e1ba7c06c0..a174d5f403 100644
--- a/core/src/main/scala/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/spark/network/ReceiverTest.scala
@@ -3,7 +3,7 @@ package spark.network
import java.nio.ByteBuffer
import java.net.InetAddress
-object ReceiverTest {
+private[spark] object ReceiverTest {
def main(args: Array[String]) {
val manager = new ConnectionManager(9999)
diff --git a/core/src/main/scala/spark/network/SenderTest.scala b/core/src/main/scala/spark/network/SenderTest.scala
index 4ab6dd3414..a4ff69e4d2 100644
--- a/core/src/main/scala/spark/network/SenderTest.scala
+++ b/core/src/main/scala/spark/network/SenderTest.scala
@@ -3,7 +3,7 @@ package spark.network
import java.nio.ByteBuffer
import java.net.InetAddress
-object SenderTest {
+private[spark] object SenderTest {
def main(args: Array[String]) {
diff --git a/core/src/main/scala/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/spark/partial/ApproximateActionListener.scala
index e6535836ab..42f46e06ed 100644
--- a/core/src/main/scala/spark/partial/ApproximateActionListener.scala
+++ b/core/src/main/scala/spark/partial/ApproximateActionListener.scala
@@ -12,7 +12,7 @@ import spark.scheduler.JobListener
* a result of type U for each partition, and that the action returns a partial or complete result
* of type R. Note that the type R must *include* any error bars on it (e.g. see BoundedInt).
*/
-class ApproximateActionListener[T, U, R](
+private[spark] class ApproximateActionListener[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
diff --git a/core/src/main/scala/spark/partial/ApproximateEvaluator.scala b/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
index 4772e43ef0..75713b2eaa 100644
--- a/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
+++ b/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
@@ -4,7 +4,7 @@ package spark.partial
* An object that computes a function incrementally by merging in results of type U from multiple
* tasks. Allows partial evaluation at any point by calling currentResult().
*/
-trait ApproximateEvaluator[U, R] {
+private[spark] trait ApproximateEvaluator[U, R] {
def merge(outputId: Int, taskResult: U): Unit
def currentResult(): R
}
diff --git a/core/src/main/scala/spark/partial/BoundedDouble.scala b/core/src/main/scala/spark/partial/BoundedDouble.scala
index 463c33d6e2..8bedd75182 100644
--- a/core/src/main/scala/spark/partial/BoundedDouble.scala
+++ b/core/src/main/scala/spark/partial/BoundedDouble.scala
@@ -3,6 +3,7 @@ package spark.partial
/**
* A Double with error bars on it.
*/
+private[spark]
class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
override def toString(): String = "[%.3f, %.3f]".format(low, high)
}
diff --git a/core/src/main/scala/spark/partial/CountEvaluator.scala b/core/src/main/scala/spark/partial/CountEvaluator.scala
index 1bc90d6b39..daf2c5170c 100644
--- a/core/src/main/scala/spark/partial/CountEvaluator.scala
+++ b/core/src/main/scala/spark/partial/CountEvaluator.scala
@@ -8,7 +8,7 @@ import cern.jet.stat.Probability
* TODO: There's currently a lot of shared code between this and GroupedCountEvaluator. It might
* be best to make this a special case of GroupedCountEvaluator with one group.
*/
-class CountEvaluator(totalOutputs: Int, confidence: Double)
+private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[Long, BoundedDouble] {
var outputsMerged = 0
diff --git a/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
index 3e631c0efc..01fbb8a11b 100644
--- a/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
@@ -14,7 +14,7 @@ import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
/**
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
*/
-class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
+private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
var outputsMerged = 0
diff --git a/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala b/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
index 2a9ccba205..c622df5220 100644
--- a/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
+++ b/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
@@ -12,7 +12,7 @@ import spark.util.StatCounter
/**
* An ApproximateEvaluator for means by key. Returns a map of key to confidence interval.
*/
-class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double)
+private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
var outputsMerged = 0
diff --git a/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala b/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
index 6a2ec7a7bd..20fa55cff2 100644
--- a/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
+++ b/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
@@ -12,7 +12,7 @@ import spark.util.StatCounter
/**
* An ApproximateEvaluator for sums by key. Returns a map of key to confidence interval.
*/
-class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double)
+private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
var outputsMerged = 0
diff --git a/core/src/main/scala/spark/partial/MeanEvaluator.scala b/core/src/main/scala/spark/partial/MeanEvaluator.scala
index b8c7cb8863..762c85400d 100644
--- a/core/src/main/scala/spark/partial/MeanEvaluator.scala
+++ b/core/src/main/scala/spark/partial/MeanEvaluator.scala
@@ -7,7 +7,7 @@ import spark.util.StatCounter
/**
* An ApproximateEvaluator for means.
*/
-class MeanEvaluator(totalOutputs: Int, confidence: Double)
+private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[StatCounter, BoundedDouble] {
var outputsMerged = 0
diff --git a/core/src/main/scala/spark/partial/PartialResult.scala b/core/src/main/scala/spark/partial/PartialResult.scala
index 200ed4ea1e..beafbf67c3 100644
--- a/core/src/main/scala/spark/partial/PartialResult.scala
+++ b/core/src/main/scala/spark/partial/PartialResult.scala
@@ -1,6 +1,6 @@
package spark.partial
-class PartialResult[R](initialVal: R, isFinal: Boolean) {
+private[spark] class PartialResult[R](initialVal: R, isFinal: Boolean) {
private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
private var failure: Option[Exception] = None
private var completionHandler: Option[R => Unit] = None
diff --git a/core/src/main/scala/spark/partial/StudentTCacher.scala b/core/src/main/scala/spark/partial/StudentTCacher.scala
index 6263ee3518..443abba5cd 100644
--- a/core/src/main/scala/spark/partial/StudentTCacher.scala
+++ b/core/src/main/scala/spark/partial/StudentTCacher.scala
@@ -7,7 +7,7 @@ import cern.jet.stat.Probability
* and various sample sizes. This is used by the MeanEvaluator to efficiently calculate
* confidence intervals for many keys.
*/
-class StudentTCacher(confidence: Double) {
+private[spark] class StudentTCacher(confidence: Double) {
val NORMAL_APPROX_SAMPLE_SIZE = 100 // For samples bigger than this, use Gaussian approximation
val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2)
val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0)
diff --git a/core/src/main/scala/spark/partial/SumEvaluator.scala b/core/src/main/scala/spark/partial/SumEvaluator.scala
index 0357a6bff8..58fb60f441 100644
--- a/core/src/main/scala/spark/partial/SumEvaluator.scala
+++ b/core/src/main/scala/spark/partial/SumEvaluator.scala
@@ -9,7 +9,7 @@ import spark.util.StatCounter
* together, then uses the formula for the variance of two independent random variables to get
* a variance for the result and compute a confidence interval.
*/
-class SumEvaluator(totalOutputs: Int, confidence: Double)
+private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[StatCounter, BoundedDouble] {
var outputsMerged = 0
diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala
index e09b92d667..5a4e9a582d 100644
--- a/core/src/main/scala/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala
@@ -5,7 +5,7 @@ import spark.TaskContext
/**
* Tracks information about an active job in the DAGScheduler.
*/
-class ActiveJob(
+private[spark] class ActiveJob(
val runId: Int,
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 70931407ce..9b666ed181 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -21,6 +21,7 @@ import spark.storage.BlockManagerId
* schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
* and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
*/
+private[spark]
class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging {
taskSched.setListener(this)
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 11f0ef6245..3422a21d9d 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -10,9 +10,9 @@ import spark._
* submitted) but there is a single "logic" thread that reads these events and takes decisions.
* This greatly simplifies synchronization.
*/
-sealed trait DAGSchedulerEvent
+private[spark] sealed trait DAGSchedulerEvent
-case class JobSubmitted(
+private[spark] case class JobSubmitted(
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
@@ -21,15 +21,15 @@ case class JobSubmitted(
listener: JobListener)
extends DAGSchedulerEvent
-case class CompletionEvent(
+private[spark] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any])
extends DAGSchedulerEvent
-case class HostLost(host: String) extends DAGSchedulerEvent
+private[spark] case class HostLost(host: String) extends DAGSchedulerEvent
-case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
+private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
-case object StopDAGScheduler extends DAGSchedulerEvent
+private[spark] case object StopDAGScheduler extends DAGSchedulerEvent
diff --git a/core/src/main/scala/spark/scheduler/JobListener.scala b/core/src/main/scala/spark/scheduler/JobListener.scala
index d4dd536a7d..f46b9d551d 100644
--- a/core/src/main/scala/spark/scheduler/JobListener.scala
+++ b/core/src/main/scala/spark/scheduler/JobListener.scala
@@ -5,7 +5,7 @@ package spark.scheduler
* DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
* job fails (and no further taskSucceeded events will happen).
*/
-trait JobListener {
+private[spark] trait JobListener {
def taskSucceeded(index: Int, result: Any)
def jobFailed(exception: Exception)
}
diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala
index 62b458eccb..c4a74e526f 100644
--- a/core/src/main/scala/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/spark/scheduler/JobResult.scala
@@ -3,7 +3,7 @@ package spark.scheduler
/**
* A result of a job in the DAGScheduler.
*/
-sealed trait JobResult
+private[spark] sealed trait JobResult
-case class JobSucceeded(results: Seq[_]) extends JobResult
-case class JobFailed(exception: Exception) extends JobResult
+private[spark] case class JobSucceeded(results: Seq[_]) extends JobResult
+private[spark] case class JobFailed(exception: Exception) extends JobResult
diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala
index 4c2ae23051..b3d4feebe5 100644
--- a/core/src/main/scala/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala
@@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer
/**
* An object that waits for a DAGScheduler job to complete.
*/
-class JobWaiter(totalTasks: Int) extends JobListener {
+private[spark] class JobWaiter(totalTasks: Int) extends JobListener {
private val taskResults = ArrayBuffer.fill[Any](totalTasks)(null)
private var finishedTasks = 0
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 090ced9d76..2ebd4075a2 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -2,7 +2,7 @@ package spark.scheduler
import spark._
-class ResultTask[T, U](
+private[spark] class ResultTask[T, U](
stageId: Int,
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 27d97ffee5..966a5e173a 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -15,7 +15,7 @@ import com.ning.compress.lzf.LZFOutputStream
import spark._
import spark.storage._
-object ShuffleMapTask {
+private[spark] object ShuffleMapTask {
// A simple map between the stage id to the serialized byte array of a task.
// Served as a cache for task serialization because serialization can be
@@ -68,7 +68,7 @@ object ShuffleMapTask {
}
}
-class ShuffleMapTask(
+private[spark] class ShuffleMapTask(
stageId: Int,
var rdd: RDD[_],
var dep: ShuffleDependency[_,_,_],
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index b3ef8ac565..803dd1b97d 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -19,7 +19,7 @@ import spark.storage.BlockManagerId
* Each Stage also has a priority, which is (by default) based on the job it was submitted in.
* This allows Stages from earlier jobs to be computed first or recovered faster on failure.
*/
-class Stage(
+private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
val shuffleDep: Option[ShuffleDependency[_,_,_]], // Output shuffle if stage is a map stage
diff --git a/core/src/main/scala/spark/scheduler/Task.scala b/core/src/main/scala/spark/scheduler/Task.scala
index d69c259362..d449ac67d6 100644
--- a/core/src/main/scala/spark/scheduler/Task.scala
+++ b/core/src/main/scala/spark/scheduler/Task.scala
@@ -11,7 +11,7 @@ import scala.collection.mutable.HashMap
/**
* A task to execute on a worker node.
*/
-abstract class Task[T](val stageId: Int) extends Serializable {
+private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
def run(attemptId: Long): T
def preferredLocations: Seq[String] = Nil
@@ -25,7 +25,7 @@ abstract class Task[T](val stageId: Int) extends Serializable {
* the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
* first writing out its dependencies.
*/
-object Task {
+private[spark] object Task {
/**
* Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
*/
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
index 868ddb237c..9a54d0e854 100644
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/spark/scheduler/TaskResult.scala
@@ -7,6 +7,7 @@ import scala.collection.mutable.Map
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
+private[spark]
class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable {
def this() = this(null.asInstanceOf[T], null)
diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
index c35633d53c..d549b184b0 100644
--- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
@@ -7,7 +7,7 @@ package spark.scheduler
* are failures, and mitigating stragglers. They return events to the DAGScheduler through
* the TaskSchedulerListener interface.
*/
-trait TaskScheduler {
+private[spark] trait TaskScheduler {
def start(): Unit
// Disconnect from the cluster.
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index f838272fb4..fa4de15d0d 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -7,7 +7,7 @@ import spark.TaskEndReason
/**
* Interface for getting events back from the TaskScheduler.
*/
-trait TaskSchedulerListener {
+private[spark] trait TaskSchedulerListener {
// A task has finished or failed.
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]): Unit
diff --git a/core/src/main/scala/spark/scheduler/TaskSet.scala b/core/src/main/scala/spark/scheduler/TaskSet.scala
index 3f4a464902..a3002ca477 100644
--- a/core/src/main/scala/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSet.scala
@@ -4,7 +4,7 @@ package spark.scheduler
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
-class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int) {
+private[spark] class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int) {
val id: String = stageId + "." + attempt
override def toString: String = "TaskSet " + id
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 16fe5761c8..f5e852d203 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -16,7 +16,7 @@ import java.util.concurrent.atomic.AtomicLong
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
* start(), then submit task sets through the runTasks method.
*/
-class ClusterScheduler(val sc: SparkContext)
+private[spark] class ClusterScheduler(val sc: SparkContext)
extends TaskScheduler
with Logging {
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
index 897976c3f9..ddcd64d7c6 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
@@ -5,7 +5,7 @@ package spark.scheduler.cluster
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
-trait SchedulerBackend {
+private[spark] trait SchedulerBackend {
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
index e15d577a8b..96ebaa4601 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
@@ -1,3 +1,4 @@
package spark.scheduler.cluster
+private[spark]
class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9093a329a3..0043dbeb10 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -5,7 +5,7 @@ import spark.deploy.client.{Client, ClientListener}
import spark.deploy.{Command, JobDescription}
import scala.collection.mutable.HashMap
-class SparkDeploySchedulerBackend(
+private[spark] class SparkDeploySchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 80e8733671..1386cd9d44 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -4,19 +4,27 @@ import spark.TaskState.TaskState
import java.nio.ByteBuffer
import spark.util.SerializableBuffer
-sealed trait StandaloneClusterMessage extends Serializable
+private[spark] sealed trait StandaloneClusterMessage extends Serializable
// Master to slaves
+private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
+
+private[spark]
case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
+
+private[spark]
case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
// Slaves to master
+private[spark]
case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+private[spark]
case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
+private[spark]
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
@@ -25,5 +33,5 @@ object StatusUpdate {
}
// Internal messages in master
-case object ReviveOffers extends StandaloneClusterMessage
-case object StopMaster extends StandaloneClusterMessage
+private[spark] case object ReviveOffers extends StandaloneClusterMessage
+private[spark] case object StopMaster extends StandaloneClusterMessage
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 83e7c6e036..d2cce0dc05 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -16,6 +16,7 @@ import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClient
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
*/
+private[spark]
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
extends SchedulerBackend with Logging {
@@ -149,6 +150,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
}
-object StandaloneSchedulerBackend {
+private[spark] object StandaloneSchedulerBackend {
val ACTOR_NAME = "StandaloneScheduler"
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index f9a1b74fa5..aa097fd3a2 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -3,7 +3,7 @@ package spark.scheduler.cluster
import java.nio.ByteBuffer
import spark.util.SerializableBuffer
-class TaskDescription(
+private[spark] class TaskDescription(
val taskId: Long,
val slaveId: String,
val name: String,
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index 65e59841a9..ca84503780 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -3,6 +3,7 @@ package spark.scheduler.cluster
/**
* Information about a running task attempt inside a TaskSet.
*/
+private[spark]
class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) {
var finishTime: Long = 0
var failed = false
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index aa37462fb0..9bb88ad6a1 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -17,7 +17,7 @@ import java.nio.ByteBuffer
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
-class TaskSetManager(
+private[spark] class TaskSetManager(
sched: ClusterScheduler,
val taskSet: TaskSet)
extends Logging {
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
index 1e83f103e7..6b919d68b2 100644
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
@@ -3,5 +3,6 @@ package spark.scheduler.cluster
/**
* Represents free resources available on a worker node.
*/
+private[spark]
class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) {
}
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 53fc659345..2b38d8b52e 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -15,7 +15,7 @@ import spark.scheduler._
* the scheduler also allows each task to fail up to maxFailures times, which is useful for
* testing fault recovery.
*/
-class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkContext)
+private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkContext)
extends TaskScheduler
with Logging {
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index fdf007ffb2..9737c6b63e 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -24,7 +24,7 @@ import spark.TaskState
* Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
* remove this.
*/
-class CoarseMesosSchedulerBackend(
+private[spark] class CoarseMesosSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 44eda93dd1..e85e4ef318 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -20,7 +20,7 @@ import spark.TaskState
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
* from multiple apps can run on different cores) and in time (a core can switch ownership).
*/
-class MesosSchedulerBackend(
+private[spark] class MesosSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 37d5862575..21a2901548 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -19,7 +19,7 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import sun.nio.ch.DirectBuffer
-class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
+private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
def this() = this(null, 0)
override def writeExternal(out: ObjectOutput) {
@@ -43,11 +43,12 @@ class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
}
+private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
-class BlockLocker(numLockers: Int) {
+private[spark] class BlockLocker(numLockers: Int) {
private val hashLocker = Array.fill(numLockers)(new Object())
def getLock(blockId: String): Object = {
@@ -56,18 +57,35 @@ class BlockLocker(numLockers: Int) {
}
+private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
- case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
+ class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) {
+ def waitForReady() {
+ if (pending) {
+ synchronized {
+ while (pending) this.wait()
+ }
+ }
+ }
+
+ def markReady() {
+ pending = false
+ synchronized {
+ this.notifyAll()
+ }
+ }
+ }
private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS)
private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
+
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
- private[storage] val diskStore: BlockStore = new DiskStore(this,
- System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ private[storage] val diskStore: BlockStore =
+ new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
@@ -79,7 +97,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
var cacheTracker: CacheTracker = null
val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
-
val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
initialize()
@@ -110,45 +127,32 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Change the storage level for a local block in the block info meta data, and
- * tell the master if necessary. Note that this is only a meta data change and
- * does NOT actually change the storage of the block. If the new level is
- * invalid, then block info (if exists) will be silently removed.
+ * Tell the master about the current storage status of a block. This will send a heartbeat
+ * message reflecting the current status, *not* the desired storage level in its block info.
+ * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
- private[spark] def setLevelAndTellMaster(
- blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
-
- if (level == null) {
- throw new IllegalArgumentException("Storage level is null")
- }
-
- // If there was earlier info about the block, then use earlier tellMaster
- val oldInfo = blockInfo.get(blockId)
- val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
- if (oldInfo != null && oldInfo.tellMaster != tellMaster) {
- logWarning("Ignoring tellMaster setting as it is different from earlier setting")
- }
-
- // If level is valid, store the block info, else remove the block info
- if (level.isValid) {
- blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
- logDebug("Info for block " + blockId + " updated with new level as " + level)
- } else {
- blockInfo.remove(blockId)
- logDebug("Info for block " + blockId + " removed as new level is null or invalid")
- }
-
- // Tell master if necessary
- if (newTellMaster) {
+ def reportBlockStatus(blockId: String) {
+ locker.getLock(blockId).synchronized {
+ val curLevel = blockInfo.get(blockId) match {
+ case null =>
+ StorageLevel.NONE
+ case info =>
+ info.level match {
+ case null =>
+ StorageLevel.NONE
+ case level =>
+ val inMem = level.useMemory && memoryStore.contains(blockId)
+ val onDisk = level.useDisk && diskStore.contains(blockId)
+ new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+ }
+ }
master.mustHeartBeat(HeartBeat(
blockManagerId,
blockId,
- level,
- if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
- if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
+ curLevel,
+ if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
+ if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
logDebug("Told master about block " + blockId)
- } else {
- logDebug("Did not tell master about block " + blockId)
}
}
@@ -180,36 +184,59 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocal(blockId: String): Option[Iterator[Any]] = {
logDebug("Getting local block " + blockId)
locker.getLock(blockId).synchronized {
- // Check storage level of block
- val level = getLevel(blockId)
- if (level != null) {
- logDebug("Level for block " + blockId + " is " + level + " on local machine")
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.waitForReady() // In case the block is still being put() by another thread
+ val level = info.level
+ logDebug("Level for block " + blockId + " is " + level)
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
memoryStore.getValues(blockId) match {
- case Some(iterator) => {
- logDebug("Block " + blockId + " found in memory")
+ case Some(iterator) =>
return Some(iterator)
- }
- case None => {
+ case None =>
logDebug("Block " + blockId + " not found in memory")
- }
}
}
- // Look for block on disk
+ // Look for block on disk, potentially loading it back into memory if required
if (level.useDisk) {
logDebug("Getting block " + blockId + " from disk")
- diskStore.getValues(blockId) match {
- case Some(iterator) => {
- logDebug("Block " + blockId + " found in disk")
- return Some(iterator)
+ if (level.useMemory && level.deserialized) {
+ diskStore.getValues(blockId) match {
+ case Some(iterator) =>
+ // Put the block back in memory before returning it
+ memoryStore.putValues(blockId, iterator, level, true) match {
+ case Left(iterator2) =>
+ return Some(iterator2)
+ case _ =>
+ throw new Exception("Memory store did not return back an iterator")
+ }
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
- case None => {
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
- return None
+ } else if (level.useMemory && !level.deserialized) {
+ // Read it as a byte buffer into memory first, then return it
+ diskStore.getBytes(blockId) match {
+ case Some(bytes) =>
+ // Put a copy of the block back in memory before returning it. Note that we can't
+ // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
+ val copyForMemory = ByteBuffer.allocate(bytes.limit)
+ copyForMemory.put(bytes)
+ memoryStore.putBytes(blockId, copyForMemory, level)
+ bytes.rewind()
+ return Some(dataDeserialize(bytes))
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
+ }
+ } else {
+ diskStore.getValues(blockId) match {
+ case Some(iterator) =>
+ return Some(iterator)
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
}
}
@@ -224,39 +251,46 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Get block from the local block manager as serialized bytes.
*/
def getLocalBytes(blockId: String): Option[ByteBuffer] = {
+ // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
logDebug("Getting local block " + blockId + " as bytes")
locker.getLock(blockId).synchronized {
- // Check storage level of block
- val level = getLevel(blockId)
- if (level != null) {
- logDebug("Level for block " + blockId + " is " + level + " on local machine")
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.waitForReady() // In case the block is still being put() by another thread
+ val level = info.level
+ logDebug("Level for block " + blockId + " is " + level)
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
memoryStore.getBytes(blockId) match {
- case Some(bytes) => {
- logDebug("Block " + blockId + " found in memory")
+ case Some(bytes) =>
return Some(bytes)
- }
- case None => {
+ case None =>
logDebug("Block " + blockId + " not found in memory")
- }
}
}
// Look for block on disk
if (level.useDisk) {
- logDebug("Getting block " + blockId + " from disk")
+ // Read it as a byte buffer into memory first, then return it
diskStore.getBytes(blockId) match {
- case Some(bytes) => {
- logDebug("Block " + blockId + " found in disk")
+ case Some(bytes) =>
+ if (level.useMemory) {
+ if (level.deserialized) {
+ memoryStore.putBytes(blockId, bytes, level)
+ } else {
+ // The memory store will hang onto the ByteBuffer, so give it a copy instead of
+ // the memory-mapped file buffer we got from the disk store
+ val copyForMemory = ByteBuffer.allocate(bytes.limit)
+ copyForMemory.put(bytes)
+ memoryStore.putBytes(blockId, copyForMemory, level)
+ }
+ }
+ bytes.rewind()
return Some(bytes)
- }
- case None => {
+ case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
- return None
- }
}
}
} else {
@@ -431,6 +465,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
+ if (blockInfo.containsKey(blockId)) {
+ logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
+ return
+ }
+
+ // Remember the block's storage level so that we can correctly drop it to disk if it needs
+ // to be dropped right after it got put into memory. Note, however, that other threads will
+ // not be able to get() this block until we call markReady on its BlockInfo.
+ val myInfo = new BlockInfo(level, tellMaster)
+ blockInfo.put(blockId, myInfo)
+
val startTimeMs = System.currentTimeMillis
var bytes: ByteBuffer = null
@@ -444,32 +489,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
- // Check and warn if block with same id already exists
- if (getLevel(blockId) != null) {
- logWarning("Block " + blockId + " already exists in local machine")
- return
- }
-
- if (level.useMemory && level.useDisk) {
- // If saving to both memory and disk, then serialize only once
- memoryStore.putValues(blockId, values, level, true) match {
- case Left(newValues) =>
- diskStore.putValues(blockId, newValues, level, true) match {
- case Right(newBytes) => bytes = newBytes
- case _ => throw new Exception("Unexpected return value")
- }
- case Right(newBytes) =>
- bytes = newBytes
- diskStore.putBytes(blockId, newBytes, level)
- }
- } else if (level.useMemory) {
- // If only save to memory
+ if (level.useMemory) {
+ // Save it just to memory first, even if it also has useDisk set to true; we will later
+ // drop it to disk if the memory store can't hold it.
memoryStore.putValues(blockId, values, level, true) match {
case Right(newBytes) => bytes = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
- // If only save to disk
+ // Save directly to disk.
val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
diskStore.putValues(blockId, values, level, askForBytes) match {
case Right(newBytes) => bytes = newBytes
@@ -477,8 +505,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- // Store the storage level
- setLevelAndTellMaster(blockId, level, tellMaster)
+ // Now that the block is in either the memory or disk store, let other threads read it,
+ // and tell the master about it.
+ myInfo.markReady()
+ if (tellMaster) {
+ reportBlockStatus(blockId)
+ }
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -521,6 +553,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
+ if (blockInfo.containsKey(blockId)) {
+ logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
+ return
+ }
+
+ // Remember the block's storage level so that we can correctly drop it to disk if it needs
+ // to be dropped right after it got put into memory. Note, however, that other threads will
+ // not be able to get() this block until we call markReady on its BlockInfo.
+ val myInfo = new BlockInfo(level, tellMaster)
+ blockInfo.put(blockId, myInfo)
+
val startTimeMs = System.currentTimeMillis
// Initiate the replication before storing it locally. This is faster as
@@ -537,22 +580,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
locker.getLock(blockId).synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
- if (getLevel(blockId) != null) {
- logWarning("Block " + blockId + " already exists")
- return
- }
if (level.useMemory) {
+ // Store it only in memory at first, even if useDisk is also set to true
bytes.rewind()
memoryStore.putBytes(blockId, bytes, level)
- }
- if (level.useDisk) {
+ } else {
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
- // Store the storage level
- setLevelAndTellMaster(blockId, level, tellMaster)
+ // Now that the block is in either the memory or disk store, let other threads read it,
+ // and tell the master about it.
+ myInfo.markReady()
+ if (tellMaster) {
+ reportBlockStatus(blockId)
+ }
}
// TODO: This code will be removed when CacheTracker is gone.
@@ -604,11 +647,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// TODO: This code will be removed when CacheTracker is gone.
private def notifyTheCacheTracker(key: String) {
- val rddInfo = key.split("_")
- val rddId: Int = rddInfo(1).toInt
- val splitIndex: Int = rddInfo(2).toInt
- val host = System.getProperty("spark.hostname", Utils.localHostName())
- cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, splitIndex, host))
+ if (cacheTracker != null) {
+ val rddInfo = key.split("_")
+ val rddId: Int = rddInfo(1).toInt
+ val partition: Int = rddInfo(2).toInt
+ val host = System.getProperty("spark.hostname", Utils.localHostName())
+ cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, partition, host))
+ }
}
/**
@@ -626,22 +671,31 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Drop block from memory (called when memory store has reached it limit)
+ * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
+ * store reaches its limit and needs to free up space.
*/
- def dropFromMemory(blockId: String) {
+ def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
+ logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized {
- val level = getLevel(blockId)
- if (level == null) {
- logWarning("Block " + blockId + " cannot be removed from memory as it does not exist")
- return
- }
- if (!level.useMemory) {
- logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory")
- return
+ val info = blockInfo.get(blockId)
+ val level = info.level
+ if (level.useDisk && !diskStore.contains(blockId)) {
+ logInfo("Writing block " + blockId + " to disk")
+ data match {
+ case Left(iterator) =>
+ diskStore.putValues(blockId, iterator, level, false)
+ case Right(bytes) =>
+ diskStore.putBytes(blockId, bytes, level)
+ }
}
memoryStore.remove(blockId)
- val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
- setLevelAndTellMaster(blockId, newLevel)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId)
+ }
+ if (!level.useDisk) {
+ // The block is completely gone from this node; forget it so we can put() it again later.
+ blockInfo.remove(blockId)
+ }
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 0c654364cd..7bfa31ac3d 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -16,13 +16,16 @@ import akka.util.duration._
import spark.{Logging, SparkException, Utils}
+private[spark]
sealed trait ToBlockManagerMaster
+private[spark]
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long)
extends ToBlockManagerMaster
+private[spark]
class HeartBeat(
var blockManagerId: BlockManagerId,
var blockId: String,
@@ -53,6 +56,7 @@ class HeartBeat(
}
}
+private[spark]
object HeartBeat {
def apply(blockManagerId: BlockManagerId,
blockId: String,
@@ -68,18 +72,23 @@ object HeartBeat {
}
}
+private[spark]
case class GetLocations(blockId: String) extends ToBlockManagerMaster
+private[spark]
case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+private[spark]
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+private[spark]
case class RemoveHost(host: String) extends ToBlockManagerMaster
+private[spark]
case object StopBlockManagerMaster extends ToBlockManagerMaster
-class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
@@ -330,7 +339,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
}
-class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
+private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
extends Logging {
val AKKA_ACTOR_NAME: String = "BlockMasterManager"
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index 47e4d14010..f72079e267 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -17,7 +17,7 @@ import spark.network._
*
* TODO: Use event model.
*/
-class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
+private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
initLogging()
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
@@ -87,7 +87,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
}
}
-object BlockManagerWorker extends Logging {
+private[spark] object BlockManagerWorker extends Logging {
private var blockManagerWorker: BlockManagerWorker = null
private val DATA_TRANSFER_TIME_OUT_MS: Long = 500
private val REQUEST_RETRY_INTERVAL_MS: Long = 1000
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index 4b5cfebba2..3f234df654 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -8,11 +8,11 @@ import scala.collection.mutable.ArrayBuffer
import spark._
import spark.network._
-case class GetBlock(id: String)
-case class GotBlock(id: String, data: ByteBuffer)
-case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel)
+private[spark] case class GetBlock(id: String)
+private[spark] case class GotBlock(id: String, data: ByteBuffer)
+private[spark] case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel)
-class BlockMessage() {
+private[spark] class BlockMessage() {
// Un-initialized: typ = 0
// GetBlock: typ = 1
// GotBlock: typ = 2
@@ -158,7 +158,7 @@ class BlockMessage() {
}
}
-object BlockMessage {
+private[spark] object BlockMessage {
val TYPE_NON_INITIALIZED: Int = 0
val TYPE_GET_BLOCK: Int = 1
val TYPE_GOT_BLOCK: Int = 2
diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala
index 64acc7eb47..a25decb123 100644
--- a/core/src/main/scala/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala
@@ -8,6 +8,7 @@ import scala.collection.mutable.ArrayBuffer
import spark._
import spark.network._
+private[spark]
class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging {
def this(bm: BlockMessage) = this(Array(bm))
@@ -85,7 +86,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
}
}
-object BlockMessageArray {
+private[spark] object BlockMessageArray {
def fromBufferMessage(bufferMessage: BufferMessage): BlockMessageArray = {
val newBlockMessageArray = new BlockMessageArray()
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 5f123aca78..ff482ff66b 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -31,5 +31,7 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def remove(blockId: String)
+ def contains(blockId: String): Boolean
+
def clear() { }
}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index e198813456..d0c592ccb1 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -4,9 +4,9 @@ import java.nio.ByteBuffer
import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel.MapMode
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-import java.util.UUID
+import java.util.{Random, Date}
import spark.Utils
-import java.nio.channels.FileChannel
+import java.text.SimpleDateFormat
/**
* Stores BlockManager blocks on disk.
@@ -26,7 +26,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
addShutdownHook()
override def getSize(blockId: String): Long = {
- getFile(blockId).length
+ getFile(blockId).length()
}
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
@@ -93,6 +93,10 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
}
+ override def contains(blockId: String): Boolean = {
+ getFile(blockId).exists()
+ }
+
private def createFile(blockId: String): File = {
val file = getFile(blockId)
if (file.exists()) {
@@ -130,16 +134,18 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
private def createLocalDirs(): Array[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
+ val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map(rootDir => {
var foundLocalDir: Boolean = false
var localDir: File = null
- var localDirUuid: UUID = null
+ var localDirId: String = null
var tries = 0
+ val rand = new Random()
while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
tries += 1
try {
- localDirUuid = UUID.randomUUID()
- localDir = new File(rootDir, "spark-local-" + localDirUuid)
+ localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
+ localDir = new File(rootDir, "spark-local-" + localDirId)
if (!localDir.exists) {
localDir.mkdirs()
foundLocalDir = true
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index ea6f3c4fcc..74ef326038 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -18,29 +18,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
- //private val blockDropper = Executors.newSingleThreadExecutor()
- private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
- private val blockDropper = new Thread("memory store - block dropper") {
- override def run() {
- try {
- while (true) {
- val blockId = blocksToDrop.take()
- logDebug("Block " + blockId + " ready to be dropped")
- blockManager.dropFromMemory(blockId)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Shutting down block dropper")
- }
- }
- }
- blockDropper.start()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = {
- entries.synchronized {
+ synchronized {
entries.get(blockId).size
}
}
@@ -52,19 +35,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- ensureFreeSpace(sizeEstimate)
- val entry = new Entry(elements, sizeEstimate, true)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += sizeEstimate
- logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
+ tryToPut(blockId, elements, sizeEstimate, true)
} else {
val entry = new Entry(bytes, bytes.limit, false)
- ensureFreeSpace(bytes.limit)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += bytes.limit
- logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
+ ensureFreeSpace(blockId, bytes.limit)
+ synchronized { entries.put(blockId, entry) }
+ tryToPut(blockId, bytes, bytes.limit, false)
}
}
@@ -79,27 +55,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- ensureFreeSpace(sizeEstimate)
- val entry = new Entry(elements, sizeEstimate, true)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += sizeEstimate
- logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
+ tryToPut(blockId, elements, sizeEstimate, true)
Left(elements.iterator)
} else {
val bytes = blockManager.dataSerialize(values)
- ensureFreeSpace(bytes.limit)
- val entry = new Entry(bytes, bytes.limit, false)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += bytes.limit
- logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
+ tryToPut(blockId, bytes, bytes.limit, false)
Right(bytes)
}
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
- val entry = entries.synchronized {
+ val entry = synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -112,7 +78,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
- val entry = entries.synchronized {
+ val entry = synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -126,7 +92,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def remove(blockId: String) {
- entries.synchronized {
+ synchronized {
val entry = entries.get(blockId)
if (entry != null) {
entries.remove(blockId)
@@ -134,54 +100,118 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
} else {
- logWarning("Block " + blockId + " could not be removed as it doesnt exist")
+ logWarning("Block " + blockId + " could not be removed as it does not exist")
}
}
}
override def clear() {
- entries.synchronized {
+ synchronized {
entries.clear()
}
- blockDropper.interrupt()
logInfo("MemoryStore cleared")
}
- // TODO: This should be able to return false if the space is larger than our total memory,
- // or if adding this block would require evicting another one from the same RDD
- private def ensureFreeSpace(space: Long) {
+ /**
+ * Return the RDD ID that a given block ID is from, or null if it is not an RDD block.
+ */
+ private def getRddId(blockId: String): String = {
+ if (blockId.startsWith("rdd_")) {
+ blockId.split('_')(1)
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Try to put in a set of values, if we can free up enough space. The value should either be
+ * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
+ * size must also be passed by the caller.
+ */
+ private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
+ synchronized {
+ if (ensureFreeSpace(blockId, size)) {
+ val entry = new Entry(value, size, deserialized)
+ entries.put(blockId, entry)
+ currentMemory += size
+ if (deserialized) {
+ logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
+ } else {
+ logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
+ }
+ true
+ } else {
+ // Tell the block manager that we couldn't put it in memory so that it can drop it to
+ // disk if the block allows disk storage.
+ val data = if (deserialized) {
+ Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ } else {
+ Right(value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
+ false
+ }
+ }
+ }
+
+ /**
+ * Tries to free up a given amount of space to store a particular block, but can fail and return
+ * false if either the block is bigger than our memory or it would require replacing another
+ * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
+ * don't fit into memory that we want to avoid).
+ *
+ * Assumes that a lock on entries is held by the caller.
+ */
+ private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
+ if (space > maxMemory) {
+ logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
+ return false
+ }
+
if (maxMemory - currentMemory < space) {
-
+ val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
- entries.synchronized {
- val iter = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
- val pair = iter.next()
- val blockId = pair.getKey
- val entry = pair.getValue
- if (!entry.dropPending) {
- selectedBlocks += blockId
- entry.dropPending = true
- }
- selectedMemory += pair.getValue.size
- logInfo("Block " + blockId + " selected for dropping")
+ val iterator = entries.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+ val pair = iterator.next()
+ val blockId = pair.getKey
+ if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
+ logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
+ "block from the same RDD")
+ return false
}
+ selectedBlocks += blockId
+ selectedMemory += pair.getValue.size
}
- logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
- blocksToDrop.size + " blocks pending")
- var i = 0
- while (i < selectedBlocks.size) {
- blocksToDrop.add(selectedBlocks(i))
- i += 1
+ if (maxMemory - (currentMemory - selectedMemory) >= space) {
+ logInfo(selectedBlocks.size + " blocks selected for dropping")
+ for (blockId <- selectedBlocks) {
+ val entry = entries.get(blockId)
+ val data = if (entry.deserialized) {
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ } else {
+ Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
+ }
+ return true
+ } else {
+ return false
}
- selectedBlocks.clear()
}
+ return true
+ }
+
+ override def contains(blockId: String): Boolean = {
+ synchronized { entries.containsKey(blockId) }
}
}
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index 2d52fac1ef..2237ce92b3 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -2,7 +2,7 @@ package spark.storage
import java.io.{Externalizable, ObjectInput, ObjectOutput}
-class StorageLevel(
+private[spark] class StorageLevel(
var useDisk: Boolean,
var useMemory: Boolean,
var deserialized: Boolean,
@@ -63,7 +63,7 @@ class StorageLevel(
"StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
}
-object StorageLevel {
+private[spark] object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index df4e23bfd6..f670ccb709 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -17,7 +17,7 @@ import java.util.concurrent.TimeoutException
/**
* Various utility classes for working with Akka.
*/
-object AkkaUtils {
+private[spark] object AkkaUtils {
/**
* Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
* ActorSystem itself and its port (which is hard to get from Akka).
diff --git a/core/src/main/scala/spark/util/IntParam.scala b/core/src/main/scala/spark/util/IntParam.scala
index c3ff063569..0427646747 100644
--- a/core/src/main/scala/spark/util/IntParam.scala
+++ b/core/src/main/scala/spark/util/IntParam.scala
@@ -3,7 +3,7 @@ package spark.util
/**
* An extractor object for parsing strings into integers.
*/
-object IntParam {
+private[spark] object IntParam {
def unapply(str: String): Option[Int] = {
try {
Some(str.toInt)
diff --git a/core/src/main/scala/spark/util/MemoryParam.scala b/core/src/main/scala/spark/util/MemoryParam.scala
index 4fba914afe..3726738842 100644
--- a/core/src/main/scala/spark/util/MemoryParam.scala
+++ b/core/src/main/scala/spark/util/MemoryParam.scala
@@ -6,7 +6,7 @@ import spark.Utils
* An extractor object for parsing JVM memory strings, such as "10g", into an Int representing
* the number of megabytes. Supports the same formats as Utils.memoryStringToMb.
*/
-object MemoryParam {
+private[spark] object MemoryParam {
def unapply(str: String): Option[Int] = {
try {
Some(Utils.memoryStringToMb(str))
diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala
index 0830843a77..09d588fe1c 100644
--- a/core/src/main/scala/spark/util/SerializableBuffer.scala
+++ b/core/src/main/scala/spark/util/SerializableBuffer.scala
@@ -8,6 +8,7 @@ import java.nio.channels.Channels
* A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
* it easier to pass ByteBuffers in case class messages.
*/
+private[spark]
class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
def value = buffer
diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala
index 11d7939204..023ec09332 100644
--- a/core/src/main/scala/spark/util/StatCounter.scala
+++ b/core/src/main/scala/spark/util/StatCounter.scala
@@ -5,6 +5,7 @@ package spark.util
* numerically robust way. Includes support for merging two StatCounters. Based on Welford and
* Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance.
*/
+private[spark]
class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
@@ -82,7 +83,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
}
}
-object StatCounter {
+private[spark] object StatCounter {
def apply(values: TraversableOnce[Double]) = new StatCounter(values)
def apply(values: Double*) = new StatCounter(values)
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index fce1deaa5c..48c0a830e0 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -18,9 +18,9 @@ import storage.StorageLevel
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
val clusterUrl = "local-cluster[2,1,512]"
-
+
@transient var sc: SparkContext = _
-
+
after {
if (sc != null) {
sc.stop()
@@ -28,6 +28,22 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
}
+ test("local-cluster format") {
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count == 2)
+ sc.stop()
+ sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count == 2)
+ sc.stop()
+ sc = new SparkContext("local-cluster[2, 1, 512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count == 2)
+ sc.stop()
+ sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
+ assert(sc.parallelize(1 to 2, 2).count == 2)
+ sc.stop()
+ sc = null
+ }
+
test("simple groupByKey") {
sc = new SparkContext(clusterUrl, "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5)
@@ -38,7 +54,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
-
+
test("accumulators") {
sc = new SparkContext(clusterUrl, "test")
val accum = sc.accumulator(0)
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index d15d7285a7..f61fd45ed3 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -69,9 +69,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
- // Setting storage level of a1 and a2 to invalid; they should be removed from store and master
- store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1))
- store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0))
+ // Drop a1 and a2 from memory; this should be reported back to the master
+ store.dropFromMemory("a1", null)
+ store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
@@ -113,13 +113,58 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
Thread.sleep(100)
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") === None, "a1 was in store")
+ assert(store.getSingle("a3") === None, "a3 was in store")
}
-
+
+ test("in-memory LRU for partitions of same RDD") {
+ val store = new BlockManager(master, new KryoSerializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
+ Thread.sleep(100)
+ // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
+ // from the same RDD
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+ assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store")
+ // Check that rdd_0_3 doesn't replace them even after further accesses
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ }
+
+ test("in-memory LRU for partitions of multiple RDDs") {
+ val store = new BlockManager(master, new KryoSerializer, 1200)
+ store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ Thread.sleep(100)
+ // At this point rdd_1_1 should've replaced rdd_0_1
+ assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
+ assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+ assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+ // Do a get() on rdd_0_2 so that it is the most recently used item
+ assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+ // Put in more partitions from RDD 0; they should replace rdd_1_1
+ store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ Thread.sleep(100)
+ // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
+ // when we try to add rdd_0_4.
+ assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
+ assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+ assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store")
+ assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+ assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store")
+ }
+
test("on-disk storage") {
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
@@ -149,6 +194,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
}
+ test("disk and memory storage with getLocalBytes") {
+ val store = new BlockManager(master, new KryoSerializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
+ Thread.sleep(100)
+ assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+ assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
test("disk and memory storage with serialization") {
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
@@ -165,6 +226,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
}
+ test("disk and memory storage with serialization and getLocalBytes") {
+ val store = new BlockManager(master, new KryoSerializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
+ Thread.sleep(100)
+ assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+ assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
test("LRU with mixed storage levels") {
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
@@ -264,4 +341,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(stream.read() === -1, "end of stream not signalled")
assert(stream.read(temp, 0, temp.length) === -1, "end of stream not signalled")
}
+
+ test("overly large block") {
+ val store = new BlockManager(master, new KryoSerializer, 500)
+ store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ assert(store.getSingle("a1") === None, "a1 was in store")
+ store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+ assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ }
}
diff --git a/docs/tuning.md b/docs/tuning.md
index 292874ce59..c90b3156bd 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -67,13 +67,14 @@ object you will serialize.
Finally, if you don't register your classes, Kryo will still work, but it will have to store the
full class name with each object, which is wasteful.
-
# Memory Tuning
There are three considerations in tuning memory usage: the *amount* of memory used by your objects
(you likely want your entire dataset to fit in memory), the *cost* of accessing those objects, and the
overhead of *garbage collection* (if you have high turnover in terms of objects).
+## Efficient Data Structures
+
By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space
than the "raw" data inside their fields. This is due to several reasons:
@@ -119,10 +120,67 @@ need to trace through all your Java objects and find the unused ones. The main p
that *the cost of garbage collection is proportional to the number of Java objects*, so using data
structures with fewer objects (e.g. an array of `Int`s instead of a `LinkedList`) greatly reduces
this cost. An even better method is to persist objects in serialized form, as described above: now
-there will be only *one* object (a byte array) per RDD partition. There is a lot of
-[detailed information on GC tuning](http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html)
-available online, but at a high level, the first thing to try if GC is a problem is to use serialized caching.
+there will be only *one* object (a byte array) per RDD partition. Before trying other advanced
+techniques, the first thing to try if GC is a problem is to use serialized caching.
+
+
+## Cache Size Tuning
+
+One of the important configuration parameters passed to Spark is the amount of memory that should be used for
+caching RDDs. By default, Spark uses 66% of the configured memory (`SPARK_MEM`) to cache RDDs. This means that
+around 33% of memory is available for any objects created during task execution.
+
+In case your tasks slow down and you find that your JVM is using almost all of its allocated memory, lowering
+this value will help reducing the memory consumption. To change this to say 50%, you can call
+`System.setProperty("spark.storage.memoryFraction", "0.5")`. Combined with the use of serialized caching,
+using a smaller cache should be sufficient to mitigate most of the garbage collection problems.
+In case you are interested in further tuning the Java GC, continue reading below.
+
+## GC Tuning
+
+The first step in GC tuning is to collect statistics on how frequently garbage collection occurs and the amount of
+time spent GC. This can be done by adding `-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps` to
+`SPARK_JAVA_OPTS` environment variable. Next time your Spark job is run, you will see messages printed on the
+console whenever a JVM garbage collection takes place. Note that garabage collections that occur at the executor can be
+found in the executor logs and not on the `spark-shell`.
+
+Some basic information about memory management in the JVM:
+
+* Java Heap space is divided in to two regions Young and Old. The Young generation is meant to hold short-lived objects
+ while the Old generation is intended for objects with longer lifetimes.
+
+* The Young generation is further divided into three regions [Eden, Survivor1, Survivor2].
+
+* A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects
+ that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old
+ enough or Survivor2 is full, it is moved to Old. Finally when Old is close to full, a full GC is invoked.
+
+The goal of GC-tuning in Spark is to ensure that only long-lived RDDs are stored in the Old generation and that
+the Young generation is sufficiently sized to store short-lived objects. This will help avoid full GCs to collect
+temporary objects created during task execution. Some steps which may be useful are:
+
+* Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for
+ before a task completes, it means that there isn't enough memory available for executing tasks.
+
+* In the GC stats that are printed, if the OldGen is close to being full, reduce the amount of memory used for caching.
+ This can be done using the `spark.storage.memoryFraction` property. It is better to cache fewer objects than to slow
+ down task execution !
+
+* If there are too many minor collections but not too many major GCs, allocating more memory for Eden would help. You
+ can approximate the size of the Eden to be an over-estimate of how much memory each task will need. If the size of Eden
+is determined to be `E`, then you can set the size of the Young generation using the option `-Xmn=4/3*E`. (The scaling
+up by 4/3 is to account for space used by survivor regions as well)
+
+* As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using
+ the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the
+ size of the block. So if we wish to have 3 or 4 tasks worth of working space, and the HDFS block size is 64 MB,
+ we can estimate size of Eden to be `4*3*64MB`.
+
+* Monitor how the frequency and time taken by garbage collection changes with the new settings.
+Our experience suggests that the effect of GC tuning depends on your application and the amount of memory available.
+There are [many more tuning options](http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html) described online,
+but at a high level, managing how frequently full GC takes place can help in reducing the overhead.
# Other Considerations
@@ -165,4 +223,4 @@ This has been a quick guide to point out the main concerns you should know about
Spark application -- most importantly, data serialization and memory tuning. For most programs,
switching to Kryo serialization and persisting data in serialized form will solve most common
performance issues. Feel free to ask on the
-[Spark mailing list](http://groups.google.com/group/spark-users) about other tuning best practices. \ No newline at end of file
+[Spark mailing list](http://groups.google.com/group/spark-users) about other tuning best practices.
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 15467115b2..d007d8519e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -61,8 +61,8 @@ object SparkBuild extends Build {
resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
- "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/",
- "Spray Repository" at "http://repo.spray.cc/"
+ "Spray Repository" at "http://repo.spray.cc/",
+ "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
),
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "11.0.1",
@@ -87,7 +87,7 @@ object SparkBuild extends Build {
def replSettings = sharedSettings ++ Seq(
name := "spark-repl",
libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _)
- ) ++ assemblySettings ++ extraAssemblySettings
+ )
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples"