diff options
83 files changed, 1414 insertions, 510 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 3da7152a09..ca59f46843 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -22,6 +22,8 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("halting by voting") { diff --git a/core/lib/mesos-0.9.0.jar b/core/lib/mesos-0.9.0.jar Binary files differdeleted file mode 100644 index b7ad79bf2a..0000000000 --- a/core/lib/mesos-0.9.0.jar +++ /dev/null diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index c157cc8feb..62186de80d 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -49,7 +49,16 @@ class Accumulable[R, T] ( else throw new UnsupportedOperationException("Can't read accumulator value in task") } - private[spark] def localValue = value_ + /** + * Get the current value of this accumulator from within a task. + * + * This is NOT the global value of the accumulator. To get the global value after a + * completed operation on the dataset, call `value`. + * + * The typical use of this method is to directly mutate the local value, eg., to add + * an element to a Set. + */ + def localValue = value_ def value_= (r: R) { if (!deserialized) value_ = r @@ -93,6 +102,7 @@ trait AccumulableParam[R, T] extends Serializable { def zero(initialValue: R): R } +private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T] extends AccumulableParam[R,T] { diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index fb65ba421a..4554db2249 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -17,18 +17,18 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis - val addresses = SparkEnv.get.mapOutputTracker.getServerAddresses(shuffleId) + val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) - val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[Int]] - for ((address, index) <- addresses.zipWithIndex) { - splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += index + val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]] + for (((address, size), index) <- statuses.zipWithIndex) { + splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size)) } - val blocksByAddress: Seq[(BlockManagerId, Seq[String])] = splitsByAddress.toSeq.map { + val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = splitsByAddress.toSeq.map { case (address, splits) => - (address, splits.map(i => "shuffle_%d_%d_%d".format(shuffleId, i, reduceId))) + (address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2))) } for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) { @@ -43,9 +43,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin case None => { val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r blockId match { - case regex(shufId, mapId, reduceId) => - val addr = addresses(mapId.toInt) - throw new FetchFailedException(addr, shufId.toInt, mapId.toInt, reduceId.toInt, null) + case regex(shufId, mapId, _) => + val address = statuses(mapId.toInt)._1 + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block") diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 9a23f9e7cc..d9cbe3730a 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -157,7 +157,7 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b } // For BlockManager.scala only - def notifyTheCacheTrackerFromBlockManager(t: AddedToCache) { + def notifyFromBlockManager(t: AddedToCache) { communicate(t) } diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index c0ff94acc6..19a51dd5b8 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -1,22 +1,51 @@ package spark -abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable +/** + * Base class for dependencies. + */ +abstract class Dependency[T](val rdd: RDD[T]) extends Serializable -abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) { +/** + * Base class for dependencies where each partition of the parent RDD is used by at most one + * partition of the child RDD. Narrow dependencies allow for pipelined execution. + */ +abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { + /** + * Get the parent partitions for a child partition. + * @param outputPartition a partition of the child RDD + * @return the partitions of the parent RDD that the child partition depends upon + */ def getParents(outputPartition: Int): Seq[Int] } +/** + * Represents a dependency on the output of a shuffle stage. + * @param shuffleId the shuffle id + * @param rdd the parent RDD + * @param aggregator optional aggregator; this allows for map-side combining + * @param partitioner partitioner used to partition the shuffle output + */ class ShuffleDependency[K, V, C]( val shuffleId: Int, @transient rdd: RDD[(K, V)], - val aggregator: Aggregator[K, V, C], + val aggregator: Option[Aggregator[K, V, C]], val partitioner: Partitioner) - extends Dependency(rdd, true) + extends Dependency(rdd) +/** + * Represents a one-to-one dependency between partitions of the parent and child RDDs. + */ class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = List(partitionId) } +/** + * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. + * @param rdd the parent RDD + * @param inStart the start of the range in the parent RDD + * @param outStart the start of the range in the child RDD + * @param length the length of the range + */ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index 12b6a0954c..ebb51607e6 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -42,7 +42,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl setConfParams() val jCtxt = getJobContext() - getOutputCommitter().setupJob(jCtxt) + getOutputCommitter().setupJob(jCtxt) } diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala index 39d554b6a5..863d00eeb5 100644 --- a/core/src/main/scala/spark/JavaSerializer.scala +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -57,6 +57,6 @@ private[spark] class JavaSerializerInstance extends SerializerInstance { } } -private[spark] class JavaSerializer extends Serializer { +class JavaSerializer extends Serializer { def newInstance(): SerializerInstance = new JavaSerializerInstance } diff --git a/core/src/main/scala/spark/Logging.scala b/core/src/main/scala/spark/Logging.scala index 69935b86de..90bae26202 100644 --- a/core/src/main/scala/spark/Logging.scala +++ b/core/src/main/scala/spark/Logging.scala @@ -15,7 +15,7 @@ trait Logging { private var log_ : Logger = null // Method to get or create the logger for this object - def log: Logger = { + protected def log: Logger = { if (log_ == null) { var className = this.getClass.getName // Ignore trailing $'s in the class names for Scala objects @@ -28,48 +28,48 @@ trait Logging { } // Log methods that take only a String - def logInfo(msg: => String) { + protected def logInfo(msg: => String) { if (log.isInfoEnabled) log.info(msg) } - def logDebug(msg: => String) { + protected def logDebug(msg: => String) { if (log.isDebugEnabled) log.debug(msg) } - def logTrace(msg: => String) { + protected def logTrace(msg: => String) { if (log.isTraceEnabled) log.trace(msg) } - def logWarning(msg: => String) { + protected def logWarning(msg: => String) { if (log.isWarnEnabled) log.warn(msg) } - def logError(msg: => String) { + protected def logError(msg: => String) { if (log.isErrorEnabled) log.error(msg) } // Log methods that take Throwables (Exceptions/Errors) too - def logInfo(msg: => String, throwable: Throwable) { + protected def logInfo(msg: => String, throwable: Throwable) { if (log.isInfoEnabled) log.info(msg, throwable) } - def logDebug(msg: => String, throwable: Throwable) { + protected def logDebug(msg: => String, throwable: Throwable) { if (log.isDebugEnabled) log.debug(msg, throwable) } - def logTrace(msg: => String, throwable: Throwable) { + protected def logTrace(msg: => String, throwable: Throwable) { if (log.isTraceEnabled) log.trace(msg, throwable) } - def logWarning(msg: => String, throwable: Throwable) { + protected def logWarning(msg: => String, throwable: Throwable) { if (log.isWarnEnabled) log.warn(msg, throwable) } - def logError(msg: => String, throwable: Throwable) { + protected def logError(msg: => String, throwable: Throwable) { if (log.isErrorEnabled) log.error(msg, throwable) } // Method for ensuring that logging is initialized, to avoid having multiple // threads do it concurrently (as SLF4J initialization is not thread safe). - def initLogging() { log } + protected def initLogging() { log } } diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 116d526854..45441aa5e5 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -1,6 +1,6 @@ package spark -import java.io.{DataInputStream, DataOutputStream, ByteArrayOutputStream, ByteArrayInputStream} +import java.io._ import java.util.concurrent.ConcurrentHashMap import akka.actor._ @@ -14,16 +14,19 @@ import akka.util.duration._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +import scheduler.MapStatus import spark.storage.BlockManagerId +import java.util.zip.{GZIPInputStream, GZIPOutputStream} private[spark] sealed trait MapOutputTrackerMessage -private[spark] case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage +private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) + extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging { def receive = { - case GetMapOutputLocations(shuffleId: Int) => - logInfo("Asked to get map output locations for shuffle " + shuffleId) + case GetMapOutputStatuses(shuffleId: Int, requester: String) => + logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester) sender ! tracker.getSerializedLocations(shuffleId) case StopMapOutputTracker => @@ -40,16 +43,16 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea val timeout = 10.seconds - var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]] + var mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]] // Incremented every time a fetch fails so that client nodes know to clear // their cache of map output locations if this happens. private var generation: Long = 0 - private var generationLock = new java.lang.Object + private val generationLock = new java.lang.Object - // Cache a serialized version of the output locations for each shuffle to send them out faster + // Cache a serialized version of the output statuses for each shuffle to send them out faster var cacheGeneration = generation - val cachedSerializedLocs = new HashMap[Int, Array[Byte]] + val cachedSerializedStatuses = new HashMap[Int, Array[Byte]] var trackerActor: ActorRef = if (isMaster) { val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName) @@ -80,31 +83,34 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea } def registerShuffle(shuffleId: Int, numMaps: Int) { - if (bmAddresses.get(shuffleId) != null) { + if (mapStatuses.get(shuffleId) != null) { throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice") } - bmAddresses.put(shuffleId, new Array[BlockManagerId](numMaps)) + mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)) } - def registerMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) { - var array = bmAddresses.get(shuffleId) + def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { + var array = mapStatuses.get(shuffleId) array.synchronized { - array(mapId) = bmAddress + array(mapId) = status } } - def registerMapOutputs(shuffleId: Int, locs: Array[BlockManagerId], changeGeneration: Boolean = false) { - bmAddresses.put(shuffleId, Array[BlockManagerId]() ++ locs) + def registerMapOutputs( + shuffleId: Int, + statuses: Array[MapStatus], + changeGeneration: Boolean = false) { + mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses) if (changeGeneration) { incrementGeneration() } } def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) { - var array = bmAddresses.get(shuffleId) + var array = mapStatuses.get(shuffleId) if (array != null) { array.synchronized { - if (array(mapId) == bmAddress) { + if (array(mapId).address == bmAddress) { array(mapId) = null } } @@ -117,10 +123,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea // Remembers which map output locations are currently being fetched on a worker val fetching = new HashSet[Int] - // Called on possibly remote nodes to get the server URIs for a given shuffle - def getServerAddresses(shuffleId: Int): Array[BlockManagerId] = { - val locs = bmAddresses.get(shuffleId) - if (locs == null) { + // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle + def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { + val statuses = mapStatuses.get(shuffleId) + if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") fetching.synchronized { if (fetching.contains(shuffleId)) { @@ -129,34 +135,38 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea try { fetching.wait() } catch { - case _ => + case e: InterruptedException => } } - return bmAddresses.get(shuffleId) + return mapStatuses.get(shuffleId).map(status => + (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId)))) } else { fetching += shuffleId } } // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) - val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]] - val fetchedLocs = deserializeLocations(fetchedBytes) + val host = System.getProperty("spark.hostname", Utils.localHostName) + val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]] + val fetchedStatuses = deserializeStatuses(fetchedBytes) logInfo("Got the output locations") - bmAddresses.put(shuffleId, fetchedLocs) + mapStatuses.put(shuffleId, fetchedStatuses) fetching.synchronized { fetching -= shuffleId fetching.notifyAll() } - return fetchedLocs + return fetchedStatuses.map(s => + (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) } else { - return locs + return statuses.map(s => + (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) } } def stop() { communicate(StopMapOutputTracker) - bmAddresses.clear() + mapStatuses.clear() trackerActor = null } @@ -182,75 +192,83 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea generationLock.synchronized { if (newGen > generation) { logInfo("Updating generation to " + newGen + " and clearing cache") - bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]] + mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]] generation = newGen } } } def getSerializedLocations(shuffleId: Int): Array[Byte] = { - var locs: Array[BlockManagerId] = null + var statuses: Array[MapStatus] = null var generationGotten: Long = -1 generationLock.synchronized { if (generation > cacheGeneration) { - cachedSerializedLocs.clear() + cachedSerializedStatuses.clear() cacheGeneration = generation } - cachedSerializedLocs.get(shuffleId) match { + cachedSerializedStatuses.get(shuffleId) match { case Some(bytes) => return bytes case None => - locs = bmAddresses.get(shuffleId) + statuses = mapStatuses.get(shuffleId) generationGotten = generation } } // If we got here, we failed to find the serialized locations in the cache, so we pulled // out a snapshot of the locations as "locs"; let's serialize and return that - val bytes = serializeLocations(locs) + val bytes = serializeStatuses(statuses) + logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length)) // Add them into the table only if the generation hasn't changed while we were working generationLock.synchronized { if (generation == generationGotten) { - cachedSerializedLocs(shuffleId) = bytes + cachedSerializedStatuses(shuffleId) = bytes } } return bytes } // Serialize an array of map output locations into an efficient byte format so that we can send - // it to reduce tasks. We do this by grouping together the locations by block manager ID. - def serializeLocations(locs: Array[BlockManagerId]): Array[Byte] = { + // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will + // generally be pretty compressible because many map outputs will be on the same hostname. + def serializeStatuses(statuses: Array[MapStatus]): Array[Byte] = { val out = new ByteArrayOutputStream - val dataOut = new DataOutputStream(out) - dataOut.writeInt(locs.length) - val grouped = locs.zipWithIndex.groupBy(_._1) - dataOut.writeInt(grouped.size) - for ((id, pairs) <- grouped if id != null) { - dataOut.writeUTF(id.ip) - dataOut.writeInt(id.port) - dataOut.writeInt(pairs.length) - for ((_, blockIndex) <- pairs) { - dataOut.writeInt(blockIndex) - } - } - dataOut.close() + val objOut = new ObjectOutputStream(new GZIPOutputStream(out)) + objOut.writeObject(statuses) + objOut.close() out.toByteArray } - // Opposite of serializeLocations. - def deserializeLocations(bytes: Array[Byte]): Array[BlockManagerId] = { - val dataIn = new DataInputStream(new ByteArrayInputStream(bytes)) - val length = dataIn.readInt() - val array = new Array[BlockManagerId](length) - val numGroups = dataIn.readInt() - for (i <- 0 until numGroups) { - val ip = dataIn.readUTF() - val port = dataIn.readInt() - val id = new BlockManagerId(ip, port) - val numBlocks = dataIn.readInt() - for (j <- 0 until numBlocks) { - array(dataIn.readInt()) = id - } + // Opposite of serializeStatuses. + def deserializeStatuses(bytes: Array[Byte]): Array[MapStatus] = { + val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes))) + objIn.readObject().asInstanceOf[Array[MapStatus]] + } +} + +private[spark] object MapOutputTracker { + private val LOG_BASE = 1.1 + + /** + * Compress a size in bytes to 8 bits for efficient reporting of map output sizes. + * We do this by encoding the log base 1.1 of the size as an integer, which can support + * sizes up to 35 GB with at most 10% error. + */ + def compressSize(size: Long): Byte = { + if (size <= 1L) { + 0 + } else { + math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte + } + } + + /** + * Decompress an 8-bit encoded block size, using the reverse operation of compressSize. + */ + def decompressSize(compressedSize: Byte): Long = { + if (compressedSize == 0) { + 1 + } else { + math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong } - array } } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 351c3d9d0b..f32ff475da 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -31,56 +31,86 @@ import spark.partial.BoundedDouble import spark.partial.CountEvaluator import spark.partial.GroupedCountEvaluator import spark.partial.PartialResult +import spark.rdd.BlockRDD +import spark.rdd.CartesianRDD +import spark.rdd.FilteredRDD +import spark.rdd.FlatMappedRDD +import spark.rdd.GlommedRDD +import spark.rdd.MappedRDD +import spark.rdd.MapPartitionsRDD +import spark.rdd.MapPartitionsWithSplitRDD +import spark.rdd.PipedRDD +import spark.rdd.SampledRDD +import spark.rdd.UnionRDD import spark.storage.StorageLevel import SparkContext._ /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, - * partitioned collection of elements that can be operated on in parallel. + * partitioned collection of elements that can be operated on in parallel. This class contains the + * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, + * [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such + * as `groupByKey` and `join`; [[spark.DoubleRDDFunctions]] contains operations available only on + * RDDs of Doubles; and [[spark.SequenceFileRDDFunctions]] contains operations available on RDDs + * that can be saved as SequenceFiles. These operations are automatically available on any RDD of + * the right type (e.g. RDD[(Int, Int)] through implicit conversions when you + * `import spark.SparkContext._`. * - * Each RDD is characterized by five main properties: - * - A list of splits (partitions) - * - A function for computing each split - * - A list of dependencies on other RDDs - * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) - * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for - * HDFS) + * Internally, each RDD is characterized by five main properties: * - * All the scheduling and execution in Spark is done based on these methods, allowing each RDD to - * implement its own way of computing itself. + * - A list of splits (partitions) + * - A function for computing each split + * - A list of dependencies on other RDDs + * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) + * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for + * an HDFS file) * - * This class also contains transformation methods available on all RDDs (e.g. map and filter). In - * addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, and - * SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles. + * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD + * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for + * reading data from a new storage system) by overriding these functions. Please refer to the + * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details + * on RDD internals. */ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable { - // Methods that must be implemented by subclasses + // Methods that must be implemented by subclasses: + + /** Set of partitions in this RDD. */ def splits: Array[Split] + + /** Function for computing a given partition. */ def compute(split: Split): Iterator[T] + + /** How this RDD depends on any parent RDDs. */ @transient val dependencies: List[Dependency[_]] + + // Methods available on all RDDs: - // Record user function generating this RDD - val origin = Utils.getSparkCallSite + /** Record user function generating this RDD. */ + private[spark] val origin = Utils.getSparkCallSite - // Optionally overridden by subclasses to specify how they are partitioned + /** Optionally overridden by subclasses to specify how they are partitioned. */ val partitioner: Option[Partitioner] = None - // Optionally overridden by subclasses to specify placement preferences + /** Optionally overridden by subclasses to specify placement preferences. */ def preferredLocations(split: Split): Seq[String] = Nil + /** The [[spark.SparkContext]] that this RDD was created on. */ def context = sc - def elementClassManifest: ClassManifest[T] = classManifest[T] + private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T] - // Get a unique ID for this RDD + /** A unique ID for this RDD (within its SparkContext). */ val id = sc.newRddId() // Variables relating to persistence private var storageLevel: StorageLevel = StorageLevel.NONE - // Change this RDD's storage level + /** + * Set this RDD's storage level to persist its values across operations after the first time + * it is computed. Can only be called once on each RDD. + */ def persist(newLevel: StorageLevel): RDD[T] = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { @@ -91,12 +121,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial this } - // Turn on the default caching level for this RDD + /** Persist this RDD with the default storage level (MEMORY_ONLY). */ def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY) - // Turn on the default caching level for this RDD + /** Persist this RDD with the default storage level (MEMORY_ONLY). */ def cache(): RDD[T] = persist() + /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel private[spark] def checkpoint(level: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): RDD[T] = { @@ -118,7 +149,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } } - // Read this RDD; will read from cache if applicable, or otherwise compute + /** + * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. + * This should ''not'' be called by users directly, but is available for implementors of custom + * subclasses of RDD. + */ final def iterator(split: Split): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel) @@ -175,8 +210,16 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial Utils.randomizeInPlace(samples, rand).take(total) } + /** + * Return the union of this RDD and another one. Any identical elements will appear multiple + * times (use `.distinct()` to eliminate them). + */ def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other)) + /** + * Return the union of this RDD and another one. Any identical elements will appear multiple + * times (use `.distinct()` to eliminate them). + */ def ++(other: RDD[T]): RDD[T] = this.union(other) def glom(): RDD[Array[T]] = new GlommedRDD(this) @@ -372,7 +415,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } def saveAsObjectFile(path: String) { - this.glom + this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .saveAsSequenceFile(path) } @@ -382,60 +425,3 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } } - -class MappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: T => U) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).map(f) -} - -class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: T => TraversableOnce[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).flatMap(f) -} - -class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).filter(f) -} - -class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator -} - -class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: Iterator[T] => Iterator[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = f(prev.iterator(split)) -} - -/** - * A variant of the MapPartitionsRDD that passes the split index into the - * closure. This can be used to generate or collect partition specific - * information such as the number of tuples in a partition. - */ -class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: (Int, Iterator[T]) => Iterator[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = f(split.index, prev.iterator(split)) -} diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala index c0e08289d8..d8bcf6326a 100644 --- a/core/src/main/scala/spark/Serializer.scala +++ b/core/src/main/scala/spark/Serializer.scala @@ -9,10 +9,10 @@ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import spark.util.ByteBufferInputStream /** - * A serializer. Because some serialization libraries are not thread safe, this class is used to + * A serializer. Because some serialization libraries are not thread safe, this class is used to * create SerializerInstances that do the actual serialization. */ -private[spark] trait Serializer { +trait Serializer { def newInstance(): SerializerInstance } @@ -88,7 +88,7 @@ private[spark] trait DeserializationStream { } gotNext = true } - + override def hasNext: Boolean = { if (!gotNext) { getNext() diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 83c1b49203..84fc541f82 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -4,12 +4,11 @@ import java.io._ import java.util.concurrent.atomic.AtomicInteger import java.net.{URI, URLClassLoader} -import akka.actor.Actor -import akka.actor.Actor._ - import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.generic.Growable +import akka.actor.Actor +import akka.actor.Actor._ import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.InputFormat @@ -27,20 +26,22 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.TextInputFormat - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} - import org.apache.mesos.{Scheduler, MesosNativeLibrary} import spark.broadcast._ - import spark.deploy.LocalSparkCluster - import spark.partial.ApproximateEvaluator import spark.partial.PartialResult - +import spark.rdd.DoubleRDDFunctions +import spark.rdd.HadoopRDD +import spark.rdd.NewHadoopRDD +import spark.rdd.OrderedRDDFunctions +import spark.rdd.PairRDDFunctions +import spark.rdd.SequenceFileRDDFunctions +import spark.rdd.UnionRDD import spark.scheduler.ShuffleMapTask import spark.scheduler.DAGScheduler import spark.scheduler.TaskScheduler @@ -49,14 +50,20 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.BlockManagerMaster -class SparkContext( - master: String, - frameworkName: String, - val sparkHome: String, - val jars: Seq[String]) +/** + * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark + * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. + * + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param jobName A name for your job, to display on the cluster web UI + * @param sparkHome Location where Spark is instaled on cluster nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + */ +class SparkContext(master: String, jobName: String, val sparkHome: String, val jars: Seq[String]) extends Logging { - def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil) + def this(master: String, jobName: String) = this(master, jobName, null, Nil) // Ensure logging is initialized before we spawn any threads initLogging() @@ -72,7 +79,7 @@ class SparkContext( private val isLocal = (master == "local" || master.startsWith("local[")) // Create the Spark execution environment (cache, map output tracker, etc) - val env = SparkEnv.createFromSystemProperties( + private[spark] val env = SparkEnv.createFromSystemProperties( System.getProperty("spark.master.host"), System.getProperty("spark.master.port").toInt, true, @@ -80,8 +87,8 @@ class SparkContext( SparkEnv.set(env) // Used to store a URL for each static file/jar together with the file's local timestamp - val addedFiles = HashMap[String, Long]() - val addedJars = HashMap[String, Long]() + private[spark] val addedFiles = HashMap[String, Long]() + private[spark] val addedJars = HashMap[String, Long]() // Add each JAR given through the constructor jars.foreach { addJar(_) } @@ -109,7 +116,7 @@ class SparkContext( case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName) scheduler.initialize(backend) scheduler @@ -128,7 +135,7 @@ class SparkContext( val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) val sparkUrl = localCluster.start() - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName) scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() @@ -140,9 +147,9 @@ class SparkContext( val scheduler = new ClusterScheduler(this) val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean val backend = if (coarseGrained) { - new CoarseMesosSchedulerBackend(scheduler, this, master, frameworkName) + new CoarseMesosSchedulerBackend(scheduler, this, master, jobName) } else { - new MesosSchedulerBackend(scheduler, this, master, frameworkName) + new MesosSchedulerBackend(scheduler, this, master, jobName) } scheduler.initialize(backend) scheduler @@ -154,14 +161,20 @@ class SparkContext( // Methods for creating RDDs - def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { + /** Distribute a local Scala collection to form an RDD. */ + def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { new ParallelCollection[T](this, seq, numSlices) } - def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { + /** Distribute a local Scala collection to form an RDD. */ + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { parallelize(seq, numSlices) } + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) @@ -199,7 +212,11 @@ class SparkContext( /** * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, - * values and the InputFormat so that users don't need to pass them directly. + * values and the InputFormat so that users don't need to pass them directly. Instead, callers + * can just write, for example, + * {{{ + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) + * }}} */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]) @@ -211,6 +228,14 @@ class SparkContext( minSplits) } + /** + * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * values and the InputFormat so that users don't need to pass them directly. Instead, callers + * can just write, for example, + * {{{ + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) + * }}} + */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = hadoopFile[K, V, F](path, defaultMinSplits) @@ -254,7 +279,7 @@ class SparkContext( new NewHadoopRDD(this, fClass, kClass, vClass, conf) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types */ + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -264,12 +289,17 @@ class SparkContext( hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) } + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = sequenceFile(path, keyClass, valueClass, defaultMinSplits) /** * Version of sequenceFile() for types implicitly convertible to Writables through a - * WritableConverter. + * WritableConverter. For example, to access a SequenceFile where the keys are Text and the + * values are IntWritable, you could simply write + * {{{ + * sparkContext.sequenceFile[String, Int](path, ...) + * }}} * * WritableConverters are provided in a somewhat strange way (by an implicit function) to support * both subclasses of Writable and types for which we define a converter (e.g. Int to @@ -310,17 +340,21 @@ class SparkContext( /** Build the union of a list of RDDs. */ def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) - /** Build the union of a list of RDDs. */ + /** Build the union of a list of RDDs passed as variable-length arguments. */ def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] = new UnionRDD(this, Seq(first) ++ rest) // Methods for creating shared variables + /** + * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values + * to using the `+=` method. Only the master can access the accumulator's `value`. + */ def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = new Accumulator(initialValue, param) /** - * Create an accumulable shared variable, with a `+=` method + * Create an [[spark.Accumulable]] shared variable, with a `+=` method * @tparam T accumulator type * @tparam R type that can be added to the accumulator */ @@ -338,10 +372,17 @@ class SparkContext( new Accumulable(initialValue, param) } - // Keep around a weak hash map of values to Cached versions? - def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal) + /** + * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for + * reading it in distributed functions. The variable will be sent to each cluster only once. + */ + def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T] (value, isLocal) - // Adds a file dependency to all Tasks executed in the future. + /** + * Add a file to be downloaded into the working directory of this Spark job on every node. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ def addFile(path: String) { val uri = new URI(path) val key = uri.getScheme match { @@ -357,12 +398,20 @@ class SparkContext( logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) } + /** + * Clear the job's list of files added by `addFile` so that they do not get donwloaded to + * any new nodes. + */ def clearFiles() { addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedFiles.clear() } - // Adds a jar dependency to all Tasks executed in the future. + /** + * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ def addJar(path: String) { val uri = new URI(path) val key = uri.getScheme match { @@ -373,12 +422,16 @@ class SparkContext( logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) } + /** + * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to + * any new nodes. + */ def clearJars() { addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedJars.clear() } - // Stop the SparkContext + /** Shut down the SparkContext. */ def stop() { dagScheduler.stop() dagScheduler = null @@ -393,10 +446,12 @@ class SparkContext( logInfo("Successfully stopped SparkContext") } - // Get Spark's home location from either a value set through the constructor, - // or the spark.home Java property, or the SPARK_HOME environment variable - // (in that order of preference). If neither of these is set, return None. - def getSparkHome(): Option[String] = { + /** + * Get Spark's home location from either a value set through the constructor, + * or the spark.home Java property, or the SPARK_HOME environment variable + * (in that order of preference). If neither of these is set, return None. + */ + private[spark] def getSparkHome(): Option[String] = { if (sparkHome != null) { Some(sparkHome) } else if (System.getProperty("spark.home") != null) { @@ -428,6 +483,10 @@ class SparkContext( result } + /** + * Run a job on a given set of partitions of an RDD, but take a function of type + * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. + */ def runJob[T, U: ClassManifest]( rdd: RDD[T], func: Iterator[T] => U, @@ -444,6 +503,9 @@ class SparkContext( runJob(rdd, func, 0 until rdd.splits.size, false) } + /** + * Run a job on all partitions in an RDD and return the results in an array. + */ def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.splits.size, false) } @@ -465,17 +527,19 @@ class SparkContext( result } - // Clean a closure to make it ready to serialized and send to tasks - // (removes unreferenced variables in $outer's, updates REPL variables) + /** + * Clean a closure to make it ready to serialized and send to tasks + * (removes unreferenced variables in $outer's, updates REPL variables) + */ private[spark] def clean[F <: AnyRef](f: F): F = { ClosureCleaner.clean(f) return f } - // Default level of parallelism to use when not given by user (e.g. for reduce tasks) + /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */ def defaultParallelism: Int = taskScheduler.defaultParallelism - // Default min number of splits for Hadoop RDDs when not given by user + /** Default min number of splits for Hadoop RDDs when not given by user */ def defaultMinSplits: Int = math.min(defaultParallelism, 2) private var nextShuffleId = new AtomicInteger(0) @@ -486,7 +550,7 @@ class SparkContext( private var nextRddId = new AtomicInteger(0) - // Register a new RDD, returning its RDD ID + /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = { nextRddId.getAndIncrement() } diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index f2a52ab356..6a006e0697 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -44,15 +44,13 @@ class SparkEnv ( blockManager.stop() blockManager.master.stop() actorSystem.shutdown() - // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit - Thread.sleep(100) + // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut + // down, but let's call it anyway in case it gets fixed in a later release actorSystem.awaitTermination() - // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit - Thread.sleep(100) } } -object SparkEnv { +object SparkEnv extends Logging { private val env = new ThreadLocal[SparkEnv] def set(e: SparkEnv) { @@ -111,6 +109,12 @@ object SparkEnv { httpFileServer.initialize() System.setProperty("spark.fileserver.uri", httpFileServer.serverUri) + // Warn about deprecated spark.cache.class property + if (System.getProperty("spark.cache.class") != null) { + logWarning("The spark.cache.class property is no longer being used! Specify storage " + + "levels using the RDD.persist() method instead.") + } + new SparkEnv( actorSystem, serializer, diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index a480fe046d..567c4b1475 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -71,7 +71,7 @@ private object Utils extends Logging { while (dir == null) { attempts += 1 if (attempts > maxAttempts) { - throw new IOException("Failed to create a temp directory after " + maxAttempts + + throw new IOException("Failed to create a temp directory after " + maxAttempts + " attempts!") } try { @@ -122,7 +122,7 @@ private object Utils extends Logging { val out = new FileOutputStream(localPath) Utils.copyStream(in, out, true) } - + /** * Download a file requested by the executor. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. @@ -140,9 +140,18 @@ private object Utils extends Logging { case "file" | null => // Remove the file if it already exists targetFile.delete() - // Symlink the file locally - logInfo("Symlinking " + url + " to " + targetFile) - FileUtil.symLink(url, targetFile.toString) + // Symlink the file locally. + if (uri.isAbsolute) { + // url is absolute, i.e. it starts with "file:///". Extract the source + // file's absolute path from the url. + val sourceFile = new File(uri) + logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) + FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath) + } else { + // url is not absolute, i.e. itself is the path to the source file. + logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath) + FileUtil.symLink(url, targetFile.getAbsolutePath) + } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others val uri = new URI(url) @@ -208,7 +217,7 @@ private object Utils extends Logging { def localHostName(): String = { customHostname.getOrElse(InetAddress.getLocalHost.getHostName) } - + /** * Returns a standard ThreadFactory except all threads are daemons. */ @@ -232,10 +241,10 @@ private object Utils extends Logging { return threadPool } - + /** - * Return the string to tell how long has passed in seconds. The passing parameter should be in - * millisecond. + * Return the string to tell how long has passed in seconds. The passing parameter should be in + * millisecond. */ def getUsedTimeMs(startTimeMs: Long): String = { return " " + (System.currentTimeMillis - startTimeMs) + " ms " diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 84ec386ce4..3c4399493c 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -1,13 +1,5 @@ package spark.api.java -import spark.SparkContext.rddToPairRDDFunctions -import spark.api.java.function.{Function2 => JFunction2} -import spark.api.java.function.{Function => JFunction} -import spark.partial.BoundedDouble -import spark.partial.PartialResult -import spark.storage.StorageLevel -import spark._ - import java.util.{List => JList} import java.util.Comparator @@ -19,6 +11,17 @@ import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration +import spark.api.java.function.{Function2 => JFunction2} +import spark.api.java.function.{Function => JFunction} +import spark.partial.BoundedDouble +import spark.partial.PartialResult +import spark.rdd.OrderedRDDFunctions +import spark.storage.StorageLevel +import spark.HashPartitioner +import spark.Partitioner +import spark.RDD +import spark.SparkContext.rddToPairRDDFunctions + class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index b1b72a3a1f..e51b0c5c15 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -4,6 +4,7 @@ import spark.deploy._ import akka.actor._ import akka.pattern.ask import akka.util.duration._ +import akka.pattern.AskTimeoutException import spark.{SparkException, Logging} import akka.remote.RemoteClientLifeCycleEvent import akka.remote.RemoteClientShutdown @@ -100,9 +101,13 @@ private[spark] class Client( def stop() { if (actor != null) { - val timeout = 1.seconds - val future = actor.ask(StopClient)(timeout) - Await.result(future, timeout) + try { + val timeout = 1.seconds + val future = actor.ask(StopClient)(timeout) + Await.result(future, timeout) + } catch { + case e: AskTimeoutException => // Ignore it, maybe master went away + } actor = null } } diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index dec0df25b4..da39108164 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -113,7 +113,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { val selectedKeysCount = selector.select() if (selectedKeysCount == 0) { - logInfo("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys") + logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys") } if (selectorThread.isInterrupted) { logInfo("Selector thread was interrupted!") @@ -167,7 +167,6 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } def removeConnection(connection: Connection) { - /*logInfo("Removing connection")*/ connectionsByKey -= connection.key if (connection.isInstanceOf[SendingConnection]) { val sendingConnection = connection.asInstanceOf[SendingConnection] @@ -235,7 +234,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { def receiveMessage(connection: Connection, message: Message) { val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress) - logInfo("Received [" + message + "] from [" + connectionManagerId + "]") + logDebug("Received [" + message + "] from [" + connectionManagerId + "]") val runnable = new Runnable() { val creationTime = System.currentTimeMillis def run() { @@ -276,15 +275,15 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logDebug("Calling back") onReceiveCallback(bufferMessage, connectionManagerId) } else { - logWarning("Not calling back as callback is null") + logDebug("Not calling back as callback is null") None } if (ackMessage.isDefined) { if (!ackMessage.get.isInstanceOf[BufferMessage]) { - logWarning("Response to " + bufferMessage + " is not a buffer message, it is of type " + ackMessage.get.getClass()) + logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " + ackMessage.get.getClass()) } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) { - logWarning("Response to " + bufferMessage + " does not have ack id set") + logDebug("Response to " + bufferMessage + " does not have ack id set") ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id } } diff --git a/core/src/main/scala/spark/package.scala b/core/src/main/scala/spark/package.scala new file mode 100644 index 0000000000..389ec4da3e --- /dev/null +++ b/core/src/main/scala/spark/package.scala @@ -0,0 +1,15 @@ +/** + * Core Spark functionality. [[spark.SparkContext]] serves as the main entry point to Spark, while + * [[spark.RDD]] is the data type representing a distributed collection, and provides most + * parallel operations. + * + * In addition, [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value + * pairs, such as `groupByKey` and `join`; [[spark.DoubleRDDFunctions]] contains operations + * available only on RDDs of Doubles; and [[spark.SequenceFileRDDFunctions]] contains operations + * available on RDDs that can be saved as SequenceFiles. These operations are automatically + * available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when + * you `import spark.SparkContext._`. + */ +package object spark { + // For package docs only +} diff --git a/core/src/main/scala/spark/partial/BoundedDouble.scala b/core/src/main/scala/spark/partial/BoundedDouble.scala index 8bedd75182..463c33d6e2 100644 --- a/core/src/main/scala/spark/partial/BoundedDouble.scala +++ b/core/src/main/scala/spark/partial/BoundedDouble.scala @@ -3,7 +3,6 @@ package spark.partial /** * A Double with error bars on it. */ -private[spark] class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) { override def toString(): String = "[%.3f, %.3f]".format(low, high) } diff --git a/core/src/main/scala/spark/partial/PartialResult.scala b/core/src/main/scala/spark/partial/PartialResult.scala index beafbf67c3..200ed4ea1e 100644 --- a/core/src/main/scala/spark/partial/PartialResult.scala +++ b/core/src/main/scala/spark/partial/PartialResult.scala @@ -1,6 +1,6 @@ package spark.partial -private[spark] class PartialResult[R](initialVal: R, isFinal: Boolean) { +class PartialResult[R](initialVal: R, isFinal: Boolean) { private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None private var failure: Option[Exception] = None private var completionHandler: Option[R => Unit] = None diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index faa99fe3e9..cb73976aed 100644 --- a/core/src/main/scala/spark/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -1,12 +1,18 @@ -package spark +package spark.rdd import scala.collection.mutable.HashMap +import spark.Dependency +import spark.RDD +import spark.SparkContext +import spark.SparkEnv +import spark.Split + private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split { val index = idx } - +private[spark] class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String]) extends RDD[T](sc) { diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 83db2d2934..7c354b6b2e 100644 --- a/core/src/main/scala/spark/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -1,10 +1,16 @@ -package spark +package spark.rdd + +import spark.NarrowDependency +import spark.RDD +import spark.SparkContext +import spark.Split private[spark] class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable { override val index: Int = idx } +private[spark] class CartesianRDD[T: ClassManifest, U:ClassManifest]( sc: SparkContext, rdd1: RDD[T], @@ -45,4 +51,4 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( def getParents(id: Int): Seq[Int] = List(id % numSplitsInRdd2) } ) -}
\ No newline at end of file +} diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index daba719b14..f1defbe492 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -1,11 +1,22 @@ -package spark +package spark.rdd import java.net.URL import java.io.EOFException import java.io.ObjectInputStream + import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap +import spark.Aggregator +import spark.Dependency +import spark.Logging +import spark.OneToOneDependency +import spark.Partitioner +import spark.RDD +import spark.ShuffleDependency +import spark.SparkEnv +import spark.Split + private[spark] sealed trait CoGroupSplitDep extends Serializable private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep @@ -38,7 +49,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) } else { logInfo("Adding shuffle dependency with " + rdd) deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]]( - context.newShuffleId, rdd, aggr, part) + context.newShuffleId, rdd, Some(aggr), part) } } deps.toList diff --git a/core/src/main/scala/spark/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index f1ae346a44..0967f4f5df 100644 --- a/core/src/main/scala/spark/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -1,4 +1,8 @@ -package spark +package spark.rdd + +import spark.NarrowDependency +import spark.RDD +import spark.Split private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala index 1fbf66b7de..d232ddeb7c 100644 --- a/core/src/main/scala/spark/DoubleRDDFunctions.scala +++ b/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala @@ -1,10 +1,13 @@ -package spark +package spark.rdd import spark.partial.BoundedDouble import spark.partial.MeanEvaluator import spark.partial.PartialResult import spark.partial.SumEvaluator +import spark.Logging +import spark.RDD +import spark.TaskContext import spark.util.StatCounter /** diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala new file mode 100644 index 0000000000..dfe9dc73f3 --- /dev/null +++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala @@ -0,0 +1,12 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).filter(f) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala new file mode 100644 index 0000000000..3534dc8057 --- /dev/null +++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: T => TraversableOnce[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).flatMap(f) +} diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala new file mode 100644 index 0000000000..e30564f2da --- /dev/null +++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala @@ -0,0 +1,12 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 6d448116a9..bf29a1f075 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import java.io.EOFException import java.util.NoSuchElementException @@ -15,6 +15,12 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils +import spark.Dependency +import spark.RDD +import spark.SerializableWritable +import spark.SparkContext +import spark.Split + /** * A Spark split class that wraps around a Hadoop InputSplit. */ diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala new file mode 100644 index 0000000000..b2c7a1cb9e --- /dev/null +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: Iterator[T] => Iterator[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = f(prev.iterator(split)) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala new file mode 100644 index 0000000000..adc541694e --- /dev/null +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala @@ -0,0 +1,21 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +/** + * A variant of the MapPartitionsRDD that passes the split index into the + * closure. This can be used to generate or collect partition specific + * information such as the number of tuples in a partition. + */ +private[spark] +class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: (Int, Iterator[T]) => Iterator[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = f(split.index, prev.iterator(split)) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala new file mode 100644 index 0000000000..59bedad8ef --- /dev/null +++ b/core/src/main/scala/spark/rdd/MappedRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class MappedRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: T => U) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).map(f) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index 9072698357..dcbceab246 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Writable @@ -13,6 +13,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptID import java.util.Date import java.text.SimpleDateFormat +import spark.Dependency +import spark.RDD +import spark.SerializableWritable +import spark.SparkContext +import spark.Split + private[spark] class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) extends Split { diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/rdd/PairRDDFunctions.scala index 80d62caf25..2a94ea263a 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/rdd/PairRDDFunctions.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import java.io.EOFException import java.io.ObjectInputStream @@ -34,9 +34,20 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} import org.apache.hadoop.mapreduce.TaskAttemptID import org.apache.hadoop.mapreduce.TaskAttemptContext -import spark.SparkContext._ import spark.partial.BoundedDouble import spark.partial.PartialResult +import spark.Aggregator +import spark.HashPartitioner +import spark.Logging +import spark.OneToOneDependency +import spark.Partitioner +import spark.RangePartitioner +import spark.RDD +import spark.SerializableWritable +import spark.SparkContext._ +import spark.SparkException +import spark.Split +import spark.TaskContext /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index 3103d7889b..98ea0c92d6 100644 --- a/core/src/main/scala/spark/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import java.io.PrintWriter import java.util.StringTokenizer @@ -8,6 +8,12 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.io.Source +import spark.OneToOneDependency +import spark.RDD +import spark.SparkEnv +import spark.Split + + /** * An RDD that pipes the contents of each parent partition through an external command * (printing them one per line) and returns the output as a collection of strings. diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala index ac10aed477..87a5268f27 100644 --- a/core/src/main/scala/spark/SampledRDD.scala +++ b/core/src/main/scala/spark/rdd/SampledRDD.scala @@ -1,9 +1,13 @@ -package spark +package spark.rdd import java.util.Random import cern.jet.random.Poisson import cern.jet.random.engine.DRand +import spark.RDD +import spark.OneToOneDependency +import spark.Split + private[spark] class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { override val index: Int = prev.index diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala index ea7171d3a1..24c731fa92 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import java.io.EOFException import java.net.URL @@ -23,7 +23,9 @@ import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.Text -import SparkContext._ +import spark.Logging +import spark.RDD +import spark.SparkContext._ /** * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile, diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 1a9f4cfec3..7577909b83 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -1,8 +1,15 @@ -package spark +package spark.rdd import scala.collection.mutable.ArrayBuffer import java.util.{HashMap => JHashMap} +import spark.Aggregator +import spark.Partitioner +import spark.RangePartitioner +import spark.RDD +import spark.ShuffleDependency +import spark.SparkEnv +import spark.Split private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { override val index = idx @@ -15,7 +22,7 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { */ abstract class ShuffledRDD[K, V, C]( @transient parent: RDD[(K, V)], - aggregator: Aggregator[K, V, C], + aggregator: Option[Aggregator[K, V, C]], part: Partitioner) extends RDD[(K, C)](parent.context) { @@ -41,7 +48,7 @@ class RepartitionShuffledRDD[K, V]( part: Partitioner) extends ShuffledRDD[K, V, V]( parent, - Aggregator[K, V, V](null, null, null, false), + None, part) { override def compute(split: Split): Iterator[(K, V)] = { @@ -88,7 +95,7 @@ class ShuffledAggregatedRDD[K, V, C]( @transient parent: RDD[(K, V)], aggregator: Aggregator[K, V, C], part : Partitioner) - extends ShuffledRDD[K, V, C](parent, aggregator, part) { + extends ShuffledRDD[K, V, C](parent, Some(aggregator), part) { override def compute(split: Split): Iterator[(K, C)] = { val combiners = new JHashMap[K, C] diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index 3e795ea2a2..f0b9225f7c 100644 --- a/core/src/main/scala/spark/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -1,7 +1,13 @@ -package spark +package spark.rdd import scala.collection.mutable.ArrayBuffer +import spark.Dependency +import spark.RangeDependency +import spark.RDD +import spark.SparkContext +import spark.Split + private[spark] class UnionSplit[T: ClassManifest]( idx: Int, rdd: RDD[T], @@ -37,7 +43,7 @@ class UnionRDD[T: ClassManifest]( override val dependencies = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 - for ((rdd, index) <- rdds.zipWithIndex) { + for (rdd <- rdds) { deps += new RangeDependency(rdd, 0, pos, rdd.splits.size) pos += rdd.splits.size } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9b666ed181..6f4c6bffd7 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -422,11 +422,11 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with case smt: ShuffleMapTask => val stage = idToStage(smt.stageId) - val bmAddress = event.result.asInstanceOf[BlockManagerId] - val host = bmAddress.ip + val status = event.result.asInstanceOf[MapStatus] + val host = status.address.ip logInfo("ShuffleMapTask finished with host " + host) if (!deadHosts.contains(host)) { // TODO: Make sure hostnames are consistent with Mesos - stage.addOutputLoc(smt.partition, bmAddress) + stage.addOutputLoc(smt.partition, status) } if (running.contains(stage) && pendingTasks(stage).isEmpty) { logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages") diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala new file mode 100644 index 0000000000..4532d9497f --- /dev/null +++ b/core/src/main/scala/spark/scheduler/MapStatus.scala @@ -0,0 +1,27 @@ +package spark.scheduler + +import spark.storage.BlockManagerId +import java.io.{ObjectOutput, ObjectInput, Externalizable} + +/** + * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the + * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. + * The map output sizes are compressed using MapOutputTracker.compressSize. + */ +private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes: Array[Byte]) + extends Externalizable { + + def this() = this(null, null) // For deserialization only + + def writeExternal(out: ObjectOutput) { + address.writeExternal(out) + out.writeInt(compressedSizes.length) + out.write(compressedSizes) + } + + def readExternal(in: ObjectInput) { + address = new BlockManagerId(in) + compressedSizes = new Array[Byte](in.readInt()) + in.readFully(compressedSizes) + } +} diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 966a5e173a..86796d3677 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -74,7 +74,7 @@ private[spark] class ShuffleMapTask( var dep: ShuffleDependency[_,_,_], var partition: Int, @transient var locs: Seq[String]) - extends Task[BlockManagerId](stageId) + extends Task[MapStatus](stageId) with Externalizable with Logging { @@ -109,13 +109,13 @@ private[spark] class ShuffleMapTask( split = in.readObject().asInstanceOf[Split] } - override def run(attemptId: Long): BlockManagerId = { + override def run(attemptId: Long): MapStatus = { val numOutputSplits = dep.partitioner.numPartitions - val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] val partitioner = dep.partitioner val bucketIterators = - if (aggregator.mapSideCombine) { + if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) { + val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]] // Apply combiners (map-side aggregation) to the map output. val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any]) for (elem <- rdd.iterator(split)) { @@ -141,17 +141,18 @@ private[spark] class ShuffleMapTask( buckets.map(_.iterator) } - val ser = SparkEnv.get.serializer.newInstance() + val compressedSizes = new Array[Byte](numOutputSplits) + val blockManager = SparkEnv.get.blockManager for (i <- 0 until numOutputSplits) { val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i - // Get a scala iterator from java map + // Get a Scala iterator from Java map val iter: Iterator[(Any, Any)] = bucketIterators(i) - // TODO: This should probably be DISK_ONLY - blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false) + val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false) + compressedSizes(i) = MapOutputTracker.compressSize(size) } - return SparkEnv.get.blockManager.blockManagerId + return new MapStatus(blockManager.blockManagerId, compressedSizes) } override def preferredLocations: Seq[String] = locs diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 803dd1b97d..1149c00a23 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -29,29 +29,29 @@ private[spark] class Stage( val isShuffleMap = shuffleDep != None val numPartitions = rdd.splits.size - val outputLocs = Array.fill[List[BlockManagerId]](numPartitions)(Nil) + val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 private var nextAttemptId = 0 def isAvailable: Boolean = { - if (/*parents.size == 0 &&*/ !isShuffleMap) { + if (!isShuffleMap) { true } else { numAvailableOutputs == numPartitions } } - def addOutputLoc(partition: Int, bmAddress: BlockManagerId) { + def addOutputLoc(partition: Int, status: MapStatus) { val prevList = outputLocs(partition) - outputLocs(partition) = bmAddress :: prevList + outputLocs(partition) = status :: prevList if (prevList == Nil) numAvailableOutputs += 1 } def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) { val prevList = outputLocs(partition) - val newList = prevList.filterNot(_ == bmAddress) + val newList = prevList.filterNot(_.address == bmAddress) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) { numAvailableOutputs -= 1 @@ -62,7 +62,7 @@ private[spark] class Stage( var becameUnavailable = false for (partition <- 0 until numPartitions) { val prevList = outputLocs(partition) - val newList = prevList.filterNot(_.ip == host) + val newList = prevList.filterNot(_.address.ip == host) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) { becameUnavailable = true diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 0043dbeb10..88cb114544 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -25,7 +25,8 @@ private[spark] class SparkDeploySchedulerBackend( "SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", - "SPARK_JAVA_OPTS" + "SPARK_JAVA_OPTS", + "SPARK_TESTING" ) // Memory used by each executor (in megabytes) diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 9bb88ad6a1..cf4aae03a7 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -341,7 +341,7 @@ private[spark] class TaskSetManager( def error(message: String) { // Save the error message - abort("Mesos error: " + message) + abort("Error: " + message) } def abort(message: String) { diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 9737c6b63e..e6d8b9d822 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -38,7 +38,8 @@ private[spark] class CoarseMesosSchedulerBackend( "SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", - "SPARK_JAVA_OPTS" + "SPARK_JAVA_OPTS", + "SPARK_TESTING" ) val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index e85e4ef318..6f01c8c09d 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -34,7 +34,8 @@ private[spark] class MesosSchedulerBackend( "SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", - "SPARK_JAVA_OPTS" + "SPARK_JAVA_OPTS", + "SPARK_TESTING" ) // Memory used by each executor (in megabytes) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 21a2901548..91b7bebfb3 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -20,7 +20,9 @@ import sun.nio.ch.DirectBuffer private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable { - def this() = this(null, 0) + def this() = this(null, 0) // For deserialization only + + def this(in: ObjectInput) = this(in.readUTF(), in.readInt()) override def writeExternal(out: ObjectOutput) { out.writeUTF(ip) @@ -61,7 +63,11 @@ private[spark] class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) extends Logging { - class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) { + class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { + var pending: Boolean = true + var size: Long = -1L + + /** Wait for this BlockInfo to be marked as ready (i.e. block is finished writing) */ def waitForReady() { if (pending) { synchronized { @@ -70,8 +76,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - def markReady() { + /** Mark this BlockInfo as ready (i.e. block is finished writing) */ + def markReady(sizeInBytes: Long) { pending = false + size = sizeInBytes synchronized { this.notifyAll() } @@ -96,8 +104,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // TODO: This will be removed after cacheTracker is removed from the code base. var cacheTracker: CacheTracker = null - val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties - val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean + // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory + // for receiving shuffle outputs) + val maxBytesInFlight = + System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 + + val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean + val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean + // Whether to compress RDD partitions that are stored serialized + val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean + + val host = System.getProperty("spark.hostname", Utils.localHostName()) initialize() @@ -183,6 +200,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def getLocal(blockId: String): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) + + // As an optimization for map output fetches, if the block is for a shuffle, return it + // without acquiring a lock; the disk store never deletes (recent) items so this should work + if (blockId.startsWith("shuffle_")) { + return diskStore.getValues(blockId) match { + case Some(iterator) => + Some(iterator) + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") + } + } + locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId) if (info != null) { @@ -208,7 +237,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m diskStore.getValues(blockId) match { case Some(iterator) => // Put the block back in memory before returning it - memoryStore.putValues(blockId, iterator, level, true) match { + memoryStore.putValues(blockId, iterator, level, true).data match { case Left(iterator2) => return Some(iterator2) case _ => @@ -227,7 +256,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m copyForMemory.put(bytes) memoryStore.putBytes(blockId, copyForMemory, level) bytes.rewind() - return Some(dataDeserialize(bytes)) + return Some(dataDeserialize(blockId, bytes)) case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } @@ -253,6 +282,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def getLocalBytes(blockId: String): Option[ByteBuffer] = { // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow logDebug("Getting local block " + blockId + " as bytes") + + // As an optimization for map output fetches, if the block is for a shuffle, return it + // without acquiring a lock; the disk store never deletes (recent) items so this should work + if (blockId.startsWith("shuffle_")) { + return diskStore.getBytes(blockId) match { + case Some(bytes) => + Some(bytes) + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") + } + } + locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId) if (info != null) { @@ -318,7 +359,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port)) if (data != null) { logDebug("Data is not null: " + data) - return Some(dataDeserialize(data)) + return Some(dataDeserialize(blockId, data)) } logDebug("Data is null") } @@ -336,9 +377,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m /** * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined - * fashion as they're received. + * fashion as they're received. Expects a size in bytes to be provided for each block fetched, + * so that we can control the maxMegabytesInFlight for the fetch. */ - def getMultiple(blocksByAddress: Seq[(BlockManagerId, Seq[String])]) + def getMultiple(blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]) : Iterator[(String, Option[Iterator[Any]])] = { if (blocksByAddress == null) { @@ -350,17 +392,37 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val localBlockIds = new ArrayBuffer[String]() val remoteBlockIds = new HashSet[String]() - // A queue to hold our results. Because we want all the deserializing the happen in the - // caller's thread, this will actually hold functions to produce the Iterator for each block. - // For local blocks we'll have an iterator already, while for remote ones we'll deserialize. - val results = new LinkedBlockingQueue[(String, Option[() => Iterator[Any]])] + // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize + // the block (since we want all deserializaton to happen in the calling thread); can also + // represent a fetch failure if size == -1. + class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { + def failed: Boolean = size == -1 + } + + // A queue to hold our results. + val results = new LinkedBlockingQueue[FetchResult] + + // A request to fetch one or more blocks, complete with their sizes + class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { + val size = blocks.map(_._2).sum + } + + // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that + // the number of bytes in flight is limited to maxBytesInFlight + val fetchRequests = new Queue[FetchRequest] - // Bound the number and memory usage of fetched remote blocks. - val blocksToRequest = new Queue[(BlockManagerId, BlockMessage)] + // Current bytes in flight from our requests + var bytesInFlight = 0L - def sendRequest(bmId: BlockManagerId, blockMessages: Seq[BlockMessage]) { - val cmId = new ConnectionManagerId(bmId.ip, bmId.port) - val blockMessageArray = new BlockMessageArray(blockMessages) + def sendRequest(req: FetchRequest) { + logDebug("Sending request for %d blocks (%s) from %s".format( + req.blocks.size, Utils.memoryBytesToString(req.size), req.address.ip)) + val cmId = new ConnectionManagerId(req.address.ip, req.address.port) + val blockMessageArray = new BlockMessageArray(req.blocks.map { + case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId)) + }) + bytesInFlight += req.size + val sizeMap = req.blocks.toMap // so we can look up the size of each blockID val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) future.onSuccess { case Some(message) => { @@ -372,58 +434,73 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m "Unexpected message " + blockMessage.getType + " received from " + cmId) } val blockId = blockMessage.getId - results.put((blockId, Some(() => dataDeserialize(blockMessage.getData)))) + results.put(new FetchResult( + blockId, sizeMap(blockId), () => dataDeserialize(blockId, blockMessage.getData))) logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } } case None => { logError("Could not get block(s) from " + cmId) - for (blockMessage <- blockMessages) { - results.put((blockMessage.getId, None)) + for ((blockId, size) <- req.blocks) { + results.put(new FetchResult(blockId, -1, null)) } } } } - // Split local and remote blocks. Remote blocks are further split into ones that will - // be requested initially and ones that will be added to a queue of blocks to request. - val initialRequestBlocks = new HashMap[BlockManagerId, ArrayBuffer[BlockMessage]]() - var initialRequests = 0 - val blocksToGetLater = new ArrayBuffer[(BlockManagerId, BlockMessage)] - for ((address, blockIds) <- Utils.randomize(blocksByAddress)) { + // Split local and remote blocks. Remote blocks are further split into FetchRequests of size + // at most maxBytesInFlight in order to limit the amount of data in flight. + val remoteRequests = new ArrayBuffer[FetchRequest] + for ((address, blockInfos) <- blocksByAddress) { if (address == blockManagerId) { - localBlockIds ++= blockIds + localBlockIds ++= blockInfos.map(_._1) } else { - remoteBlockIds ++= blockIds - for (blockId <- blockIds) { - val blockMessage = BlockMessage.fromGetBlock(GetBlock(blockId)) - if (initialRequests < numParallelFetches) { - initialRequestBlocks.getOrElseUpdate(address, new ArrayBuffer[BlockMessage]) - .append(blockMessage) - initialRequests += 1 - } else { - blocksToGetLater.append((address, blockMessage)) + remoteBlockIds ++= blockInfos.map(_._1) + // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them + // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 + // nodes, rather than blocking on reading output from one node. + val minRequestSize = math.max(maxBytesInFlight / 5, 1L) + logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize) + val iterator = blockInfos.iterator + var curRequestSize = 0L + var curBlocks = new ArrayBuffer[(String, Long)] + while (iterator.hasNext) { + val (blockId, size) = iterator.next() + curBlocks += ((blockId, size)) + curRequestSize += size + if (curRequestSize >= minRequestSize) { + // Add this FetchRequest + remoteRequests += new FetchRequest(address, curBlocks) + curRequestSize = 0 + curBlocks = new ArrayBuffer[(String, Long)] } } + // Add in the final request + if (!curBlocks.isEmpty) { + remoteRequests += new FetchRequest(address, curBlocks) + } } } - // Add the remaining blocks into a queue to pull later in a random order - blocksToRequest ++= Utils.randomize(blocksToGetLater) + // Add the remote requests into our queue in a random order + fetchRequests ++= Utils.randomize(remoteRequests) - // Send out initial request(s) for 'numParallelFetches' blocks. - for ((bmId, blockMessages) <- initialRequestBlocks) { - sendRequest(bmId, blockMessages) + // Send out initial requests for blocks, up to our maxBytesInFlight + while (!fetchRequests.isEmpty && + (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { + sendRequest(fetchRequests.dequeue()) } - logDebug("Started remote gets for " + numParallelFetches + " blocks in " + - Utils.getUsedTimeMs(startTime) + " ms") + val numGets = remoteBlockIds.size - fetchRequests.size + logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime)) - // Get the local blocks while remote blocks are being fetched. + // Get the local blocks while remote blocks are being fetched. Note that it's okay to do + // these all at once because they will just memory-map some files, so they won't consume + // any memory that might exceed our maxBytesInFlight startTime = System.currentTimeMillis for (id <- localBlockIds) { getLocal(id) match { - case Some(block) => { - results.put((id, Some(() => block))) + case Some(iter) => { + results.put(new FetchResult(id, 0, () => iter)) // Pass 0 as size since it's not in flight logDebug("Got local block " + id) } case None => { @@ -441,20 +518,23 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def next(): (String, Option[Iterator[Any]]) = { resultsGotten += 1 - val (blockId, functionOption) = results.take() - if (remoteBlockIds.contains(blockId) && !blocksToRequest.isEmpty) { - val (bmId, blockMessage) = blocksToRequest.dequeue() - sendRequest(bmId, Seq(blockMessage)) + val result = results.take() + bytesInFlight -= result.size + if (!fetchRequests.isEmpty && + (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { + sendRequest(fetchRequests.dequeue()) } - (blockId, functionOption.map(_.apply())) + (result.blockId, if (result.failed) None else Some(result.deserialize())) } } } /** - * Put a new block of values to the block manager. + * Put a new block of values to the block manager. Returns its (estimated) size in bytes. */ - def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true) { + def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true) + : Long = { + if (blockId == null) { throw new IllegalArgumentException("Block Id is null") } @@ -465,9 +545,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new IllegalArgumentException("Storage level is null or invalid") } - if (blockInfo.containsKey(blockId)) { + val oldBlock = blockInfo.get(blockId) + if (oldBlock != null) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return + oldBlock.waitForReady() + return oldBlock.size } // Remember the block's storage level so that we can correctly drop it to disk if it needs @@ -477,14 +559,19 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m blockInfo.put(blockId, myInfo) val startTimeMs = System.currentTimeMillis - var bytes: ByteBuffer = null // If we need to replicate the data, we'll want access to the values, but because our // put will read the whole iterator, there will be no values left. For the case where - // the put serializes data, we'll remember the bytes, above; but for the case where - // it doesn't, such as MEMORY_ONLY_DESER, let's rely on the put returning an Iterator. + // the put serializes data, we'll remember the bytes, above; but for the case where it + // doesn't, such as deserialized storage, let's rely on the put returning an Iterator. var valuesAfterPut: Iterator[Any] = null + // Ditto for the bytes after the put + var bytesAfterPut: ByteBuffer = null + + // Size of the block in bytes (to return to caller) + var size = 0L + locker.getLock(blockId).synchronized { logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") @@ -492,22 +579,26 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (level.useMemory) { // Save it just to memory first, even if it also has useDisk set to true; we will later // drop it to disk if the memory store can't hold it. - memoryStore.putValues(blockId, values, level, true) match { - case Right(newBytes) => bytes = newBytes + val res = memoryStore.putValues(blockId, values, level, true) + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes case Left(newIterator) => valuesAfterPut = newIterator } } else { // Save directly to disk. val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them - diskStore.putValues(blockId, values, level, askForBytes) match { - case Right(newBytes) => bytes = newBytes + val res = diskStore.putValues(blockId, values, level, askForBytes) + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes case _ => } } // Now that the block is in either the memory or disk store, let other threads read it, // and tell the master about it. - myInfo.markReady() + myInfo.markReady(size) if (tellMaster) { reportBlockStatus(blockId) } @@ -517,23 +608,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Replicate block if required if (level.replication > 1) { // Serialize the block if not already done - if (bytes == null) { + if (bytesAfterPut == null) { if (valuesAfterPut == null) { throw new SparkException( "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") } - bytes = dataSerialize(valuesAfterPut) + bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } - replicate(blockId, bytes, level) + replicate(blockId, bytesAfterPut, level) } - BlockManager.dispose(bytes) + BlockManager.dispose(bytesAfterPut) // TODO: This code will be removed when CacheTracker is gone. if (blockId.startsWith("rdd")) { - notifyTheCacheTracker(blockId) + notifyCacheTracker(blockId) } logDebug("Put block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)) + + return size } @@ -592,7 +685,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Now that the block is in either the memory or disk store, let other threads read it, // and tell the master about it. - myInfo.markReady() + myInfo.markReady(bytes.limit) if (tellMaster) { reportBlockStatus(blockId) } @@ -600,7 +693,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // TODO: This code will be removed when CacheTracker is gone. if (blockId.startsWith("rdd")) { - notifyTheCacheTracker(blockId) + notifyCacheTracker(blockId) } // If replication had started, then wait for it to finish @@ -646,13 +739,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } // TODO: This code will be removed when CacheTracker is gone. - private def notifyTheCacheTracker(key: String) { + private def notifyCacheTracker(key: String) { if (cacheTracker != null) { val rddInfo = key.split("_") val rddId: Int = rddInfo(1).toInt val partition: Int = rddInfo(2).toInt - val host = System.getProperty("spark.hostname", Utils.localHostName()) - cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, partition, host)) + cacheTracker.notifyFromBlockManager(spark.AddedToCache(rddId, partition, host)) } } @@ -699,24 +791,36 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } + def shouldCompress(blockId: String): Boolean = { + if (blockId.startsWith("shuffle_")) { + compressShuffle + } else if (blockId.startsWith("broadcast_")) { + compressBroadcast + } else if (blockId.startsWith("rdd_")) { + compressRdds + } else { + false // Won't happen in a real cluster, but it can in tests + } + } + /** - * Wrap an output stream for compression if block compression is enabled + * Wrap an output stream for compression if block compression is enabled for its block type */ - def wrapForCompression(s: OutputStream): OutputStream = { - if (compress) new LZFOutputStream(s) else s + def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { + if (shouldCompress(blockId)) new LZFOutputStream(s) else s } /** - * Wrap an input stream for compression if block compression is enabled + * Wrap an input stream for compression if block compression is enabled for its block type */ - def wrapForCompression(s: InputStream): InputStream = { - if (compress) new LZFInputStream(s) else s + def wrapForCompression(blockId: String, s: InputStream): InputStream = { + if (shouldCompress(blockId)) new LZFInputStream(s) else s } - def dataSerialize(values: Iterator[Any]): ByteBuffer = { + def dataSerialize(blockId: String, values: Iterator[Any]): ByteBuffer = { val byteStream = new FastByteArrayOutputStream(4096) val ser = serializer.newInstance() - ser.serializeStream(wrapForCompression(byteStream)).writeAll(values).close() + ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() byteStream.trim() ByteBuffer.wrap(byteStream.array) } @@ -725,10 +829,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = { + def dataDeserialize(blockId: String, bytes: ByteBuffer): Iterator[Any] = { bytes.rewind() - val ser = serializer.newInstance() - ser.deserializeStream(wrapForCompression(new ByteBufferInputStream(bytes, true))).asIterator + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator } def stop() { @@ -742,10 +846,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m private[spark] object BlockManager extends Logging { - def getNumParallelFetchesFromSystemProperties: Int = { - System.getProperty("spark.blockManager.parallelFetches", "4").toInt - } - def getMaxMemoryFromSystemProperties: Long = { val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble (Runtime.getRuntime.maxMemory * memoryFraction).toLong diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index f72079e267..d2985559c1 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -31,7 +31,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage) logDebug("Parsed as a block message array") val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get) - /*logDebug("Processed block messages")*/ return Some(new BlockMessageArray(responseMessages).toBufferMessage) } catch { case e: Exception => logError("Exception handling buffer message", e) @@ -49,13 +48,13 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends blockMessage.getType match { case BlockMessage.TYPE_PUT_BLOCK => { val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) - logInfo("Received [" + pB + "]") + logDebug("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) return None } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) - logInfo("Received [" + gB + "]") + logDebug("Received [" + gB + "]") val buffer = getBlock(gB.id) if (buffer == null) { return None diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index ff482ff66b..1286600cd1 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -15,13 +15,14 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { * Put in a block and, possibly, also return its content as either bytes or another Iterator. * This is used to efficiently write the values to multiple locations (e.g. for replication). * - * @return the values put if returnValues is true, or null otherwise + * @return a PutResult that contains the size of the data, as well as the values put if + * returnValues is true (if not, the result's data field can be null) */ def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean) - : Either[Iterator[Any], ByteBuffer] + : PutResult /** - * Return the size of a block. + * Return the size of a block in bytes. */ def getSize(blockId: String): Long diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index d0c592ccb1..fd92a3dc67 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -48,28 +48,28 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) values: Iterator[Any], level: StorageLevel, returnValues: Boolean) - : Either[Iterator[Any], ByteBuffer] = { + : PutResult = { logDebug("Attempting to write values for block " + blockId) val startTime = System.currentTimeMillis val file = createFile(blockId) - val fileOut = blockManager.wrapForCompression( + val fileOut = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(new FileOutputStream(file))) val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) objOut.writeAll(values) objOut.close() - val finishTime = System.currentTimeMillis + val length = file.length() logDebug("Block %s stored as %s file on disk in %d ms".format( - blockId, Utils.memoryBytesToString(file.length()), (finishTime - startTime))) + blockId, Utils.memoryBytesToString(length), (System.currentTimeMillis - startTime))) if (returnValues) { // Return a byte buffer for the contents of the file val channel = new RandomAccessFile(file, "r").getChannel() - val buffer = channel.map(MapMode.READ_ONLY, 0, channel.size()) + val buffer = channel.map(MapMode.READ_ONLY, 0, length) channel.close() - Right(buffer) + PutResult(length, Right(buffer)) } else { - null + PutResult(length, null) } } @@ -83,7 +83,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } override def getValues(blockId: String): Option[Iterator[Any]] = { - getBytes(blockId).map(blockManager.dataDeserialize(_)) + getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes)) } override def remove(blockId: String) { diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 74ef326038..e9288fdf43 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -31,7 +31,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { if (level.deserialized) { bytes.rewind() - val values = blockManager.dataDeserialize(bytes) + val values = blockManager.dataDeserialize(blockId, bytes) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) @@ -49,18 +49,18 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) values: Iterator[Any], level: StorageLevel, returnValues: Boolean) - : Either[Iterator[Any], ByteBuffer] = { + : PutResult = { if (level.deserialized) { val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) tryToPut(blockId, elements, sizeEstimate, true) - Left(elements.iterator) + PutResult(sizeEstimate, Left(elements.iterator)) } else { - val bytes = blockManager.dataSerialize(values) + val bytes = blockManager.dataSerialize(blockId, values) tryToPut(blockId, bytes, bytes.limit, false) - Right(bytes) + PutResult(bytes.limit(), Right(bytes)) } } @@ -71,7 +71,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (entry == null) { None } else if (entry.deserialized) { - Some(blockManager.dataSerialize(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) + Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) } else { Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data } @@ -87,7 +87,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) } else { val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data - Some(blockManager.dataDeserialize(buffer)) + Some(blockManager.dataDeserialize(blockId, buffer)) } } @@ -162,7 +162,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * - * Assumes that a lock on entries is held by the caller. + * Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space + * might fill up before the caller puts in their new value.) */ private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = { logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( @@ -172,7 +173,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit") return false } - + + // TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks + // in order to allow parallelism in writing to disk if (maxMemory - currentMemory < space) { val rddToAdd = getRddId(blockIdToAdd) val selectedBlocks = new ArrayBuffer[String]() diff --git a/core/src/main/scala/spark/storage/PutResult.scala b/core/src/main/scala/spark/storage/PutResult.scala new file mode 100644 index 0000000000..76f236057b --- /dev/null +++ b/core/src/main/scala/spark/storage/PutResult.scala @@ -0,0 +1,9 @@ +package spark.storage + +import java.nio.ByteBuffer + +/** + * Result of adding a block into a BlockStore. Contains its estimated size, and possibly the + * values put if the caller asked for them to be returned (e.g. for chaining replication) + */ +private[spark] case class PutResult(size: Long, data: Either[Iterator[_], ByteBuffer]) diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index 2237ce92b3..2d52fac1ef 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -2,7 +2,7 @@ package spark.storage import java.io.{Externalizable, ObjectInput, ObjectOutput} -private[spark] class StorageLevel( +class StorageLevel( var useDisk: Boolean, var useMemory: Boolean, var deserialized: Boolean, @@ -63,7 +63,7 @@ private[spark] class StorageLevel( "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) } -private[spark] object StorageLevel { +object StorageLevel { val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index f670ccb709..b466b5239c 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -23,6 +23,8 @@ private[spark] object AkkaUtils { * ActorSystem itself and its port (which is hard to get from Akka). */ def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { + val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt + val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] @@ -31,9 +33,9 @@ private[spark] object AkkaUtils { akka.remote.netty.hostname = "%s" akka.remote.netty.port = %d akka.remote.netty.connection-timeout = 1s - akka.remote.netty.execution-pool-size = 8 - akka.actor.default-dispatcher.throughput = 30 - """.format(host, port)) + akka.remote.netty.execution-pool-size = %d + akka.actor.default-dispatcher.throughput = %d + """.format(host, port, akkaThreads, akkaBatchSize)) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index 71df5941e5..d8be99dde7 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -18,6 +18,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test ("basic accumulation"){ @@ -53,6 +55,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } sc.stop() sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } } @@ -84,6 +88,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } should produce [SparkException] sc.stop() sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } } @@ -91,7 +97,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter val maxI = 1000 for (nThreads <- List(1, 10)) { // test single & multi-threaded - val sc = new SparkContext("local[" + nThreads + "]", "test") + sc = new SparkContext("local[" + nThreads + "]", "test") val setAcc = sc.accumulableCollection(mutable.HashSet[Int]()) val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]()) val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]()) @@ -110,6 +116,27 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter mapAcc.value should contain (i -> i.toString) } sc.stop() + sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + } + + test ("localValue readable in tasks") { + import SetAccum._ + val maxI = 1000 + for (nThreads <- List(1, 10)) { //test single & multi-threaded + sc = new SparkContext("local[" + nThreads + "]", "test") + val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) + val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet} + val d = sc.parallelize(groupedInts) + d.foreach { + x => acc.localValue ++= x + } + acc.value should be ( (0 to maxI).toSet) + sc.stop() + sc = null } } + } diff --git a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala index e00e8c4123..37cafd1e8e 100644 --- a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala +++ b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala @@ -2,9 +2,10 @@ package spark import org.scalatest.FunSuite import org.scalatest.PrivateMethodTester +import org.scalatest.matchers.ShouldMatchers // TODO: Replace this with a test of MemoryStore -class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester { +class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester with ShouldMatchers { test("constructor test") { val cache = new BoundedMemoryCache(60) expect(60)(cache.getCapacity) @@ -23,15 +24,21 @@ class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester { logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) } } + + // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length + // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. + // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html + // Work around to check for either. + //should be OK - expect(CachePutSuccess(56))(cache.put("1", 0, "Meh")) + cache.put("1", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48))) //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from //cache because it's from the same dataset expect(CachePutFailure())(cache.put("1", 1, "Meh")) //should be OK, dataset '1' can be evicted from cache - expect(CachePutSuccess(56))(cache.put("2", 0, "Meh")) + cache.put("2", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48))) //should fail, cache should obey it's capacity expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string")) diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala index 0738a2725b..2d3302f0aa 100644 --- a/core/src/test/scala/spark/BroadcastSuite.scala +++ b/core/src/test/scala/spark/BroadcastSuite.scala @@ -12,6 +12,8 @@ class BroadcastSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic broadcast") { diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 48c0a830e0..433d2fdc19 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -26,21 +26,28 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.stop() sc = null } + System.clearProperty("spark.reducer.maxMbInFlight") + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("local-cluster format") { sc = new SparkContext("local-cluster[2,1,512]", "test") - assert(sc.parallelize(1 to 2, 2).count == 2) + assert(sc.parallelize(1 to 2, 2).count() == 2) sc.stop() + System.clearProperty("spark.master.port") sc = new SparkContext("local-cluster[2 , 1 , 512]", "test") - assert(sc.parallelize(1 to 2, 2).count == 2) + assert(sc.parallelize(1 to 2, 2).count() == 2) sc.stop() + System.clearProperty("spark.master.port") sc = new SparkContext("local-cluster[2, 1, 512]", "test") - assert(sc.parallelize(1 to 2, 2).count == 2) + assert(sc.parallelize(1 to 2, 2).count() == 2) sc.stop() + System.clearProperty("spark.master.port") sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test") - assert(sc.parallelize(1 to 2, 2).count == 2) + assert(sc.parallelize(1 to 2, 2).count() == 2) sc.stop() + System.clearProperty("spark.master.port") sc = null } @@ -55,6 +62,18 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(valuesFor2.toList.sorted === List(1)) } + test("groupByKey where map output sizes exceed maxMbInFlight") { + System.setProperty("spark.reducer.maxMbInFlight", "1") + sc = new SparkContext(clusterUrl, "test") + // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output + // file should be about 2.5 MB + val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000))) + val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect() + assert(groups.length === 16) + assert(groups.map(_._2).sum === 2000) + // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block + } + test("accumulators") { sc = new SparkContext(clusterUrl, "test") val accum = sc.accumulator(0) diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala index 4405829161..a3454f25f6 100644 --- a/core/src/test/scala/spark/FailureSuite.scala +++ b/core/src/test/scala/spark/FailureSuite.scala @@ -32,6 +32,8 @@ class FailureSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } // Run a 3-task map job in which task 1 deterministically fails once, and check diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index fd7a7bd589..b4283d9604 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -7,11 +7,11 @@ import java.io.{File, PrintWriter, FileReader, BufferedReader} import SparkContext._ class FileServerSuite extends FunSuite with BeforeAndAfter { - + @transient var sc: SparkContext = _ @transient var tmpFile : File = _ @transient var testJarFile : File = _ - + before { // Create a sample text file val tmpdir = new File(Files.createTempDir(), "test") @@ -21,7 +21,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { pw.println("100") pw.close() } - + after { if (sc != null) { sc.stop() @@ -31,8 +31,10 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { if (tmpFile.exists) { tmpFile.delete() } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } - + test("Distributing files locally") { sc = new SparkContext("local[4]", "test") sc.addFile(tmpFile.toString) @@ -43,16 +45,29 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { in.close() _ * fileVal + _ * fileVal }.collect - println(result) + assert(result.toSet === Set((1,200), (2,300), (3,500))) + } + + test("Distributing files locally using URL as input") { + // addFile("file:///....") + sc = new SparkContext("local[4]", "test") + sc.addFile((new File(tmpFile.toString)).toURL.toString) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = sc.parallelize(testData).reduceByKey { + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) + val fileVal = in.readLine().toInt + in.close() + _ * fileVal + _ * fileVal + }.collect assert(result.toSet === Set((1,200), (2,300), (3,500))) } test ("Dynamically adding JARS locally") { sc = new SparkContext("local[4]", "test") - val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() + val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => + val result = sc.parallelize(testData).reduceByKey { (x,y) => val fac = Thread.currentThread.getContextClassLoader() .loadClass("org.uncommons.maths.Maths") .getDeclaredMethod("factorial", classOf[Int]) @@ -79,10 +94,10 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test") - val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() + val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => + val result = sc.parallelize(testData).reduceByKey { (x,y) => val fac = Thread.currentThread.getContextClassLoader() .loadClass("org.uncommons.maths.Maths") .getDeclaredMethod("factorial", classOf[Int]) diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala index 5c1577ed0b..554bea53a9 100644 --- a/core/src/test/scala/spark/FileSuite.scala +++ b/core/src/test/scala/spark/FileSuite.scala @@ -20,6 +20,8 @@ class FileSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("text files") { diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala new file mode 100644 index 0000000000..4e9717d871 --- /dev/null +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -0,0 +1,25 @@ +package spark + +import org.scalatest.FunSuite + +class MapOutputTrackerSuite extends FunSuite { + test("compressSize") { + assert(MapOutputTracker.compressSize(0L) === 0) + assert(MapOutputTracker.compressSize(1L) === 0) + assert(MapOutputTracker.compressSize(2L) === 8) + assert(MapOutputTracker.compressSize(10L) === 25) + assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145) + assert((MapOutputTracker.compressSize(1000000000L) & 0xFF) === 218) + // This last size is bigger than we can encode in a byte, so check that we just return 255 + assert((MapOutputTracker.compressSize(1000000000000000000L) & 0xFF) === 255) + } + + test("decompressSize") { + assert(MapOutputTracker.decompressSize(0) === 1) + for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) { + val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size)) + assert(size2 >= 0.99 * size && size2 <= 1.11 * size, + "size " + size + " decompressed to " + size2 + ", which is out of range") + } + } +} diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index 5000fa7307..3dadc7acec 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -16,6 +16,8 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index add5221e30..9b84b29227 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -13,6 +13,8 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic pipe") { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index ade457c0f9..37a0ff0947 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -3,6 +3,8 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter + +import spark.rdd.CoalescedRDD import SparkContext._ class RDDSuite extends FunSuite with BeforeAndAfter { @@ -14,6 +16,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic operations") { diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 90760b8a85..7f8ec5d48f 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -1,5 +1,7 @@ package spark +import scala.collection.mutable.ArrayBuffer + import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.scalatest.matchers.ShouldMatchers @@ -10,8 +12,7 @@ import org.scalacheck.Prop._ import com.google.common.io.Files -import scala.collection.mutable.ArrayBuffer - +import spark.rdd.ShuffledAggregatedRDD import SparkContext._ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { @@ -23,6 +24,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("groupByKey") { diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala index a2015644ee..17f366212b 100644 --- a/core/src/test/scala/spark/SizeEstimatorSuite.scala +++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala @@ -3,6 +3,7 @@ package spark import org.scalatest.FunSuite import org.scalatest.BeforeAndAfterAll import org.scalatest.PrivateMethodTester +import org.scalatest.matchers.ShouldMatchers class DummyClass1 {} @@ -19,7 +20,9 @@ class DummyClass4(val d: DummyClass3) { val x: Int = 0 } -class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { +class SizeEstimatorSuite + extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers { + var oldArch: String = _ var oldOops: String = _ @@ -42,11 +45,15 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) } + // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length. + // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. + // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html + // Work around to check for either. test("strings") { - expect(48)(SizeEstimator.estimate("")) - expect(56)(SizeEstimator.estimate("a")) - expect(56)(SizeEstimator.estimate("ab")) - expect(64)(SizeEstimator.estimate("abcdefgh")) + SizeEstimator.estimate("") should (equal (48) or equal (40)) + SizeEstimator.estimate("a") should (equal (56) or equal (48)) + SizeEstimator.estimate("ab") should (equal (56) or equal (48)) + SizeEstimator.estimate("abcdefgh") should (equal(64) or equal(56)) } test("primitive arrays") { @@ -106,6 +113,10 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet resetOrClear("os.arch", arch) } + // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length. + // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. + // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html + // Work around to check for either. test("64-bit arch with no compressed oops") { val arch = System.setProperty("os.arch", "amd64") val oops = System.setProperty("spark.test.useCompressedOops", "false") @@ -113,10 +124,10 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - expect(64)(SizeEstimator.estimate("")) - expect(72)(SizeEstimator.estimate("a")) - expect(72)(SizeEstimator.estimate("ab")) - expect(80)(SizeEstimator.estimate("abcdefgh")) + SizeEstimator.estimate("") should (equal (64) or equal (56)) + SizeEstimator.estimate("a") should (equal (72) or equal (64)) + SizeEstimator.estimate("ab") should (equal (72) or equal (64)) + SizeEstimator.estimate("abcdefgh") should (equal (80) or equal (72)) resetOrClear("os.arch", arch) resetOrClear("spark.test.useCompressedOops", oops) diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala index c87595ecb3..1ad11ff4c3 100644 --- a/core/src/test/scala/spark/SortingSuite.scala +++ b/core/src/test/scala/spark/SortingSuite.scala @@ -12,7 +12,10 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with after { if (sc != null) { sc.stop() + sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("sortByKey") { diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala index 302f731187..e9b1837d89 100644 --- a/core/src/test/scala/spark/ThreadingSuite.scala +++ b/core/src/test/scala/spark/ThreadingSuite.scala @@ -31,6 +31,8 @@ class ThreadingSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index f61fd45ed3..31b33eae09 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -13,10 +13,14 @@ import spark.SizeEstimator import spark.util.ByteBufferInputStream class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { + var store: BlockManager = null var actorSystem: ActorSystem = null var master: BlockManagerMaster = null - var oldArch: String = _ - var oldOops: String = _ + var oldArch: String = null + var oldOops: String = null + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + val serializer = new KryoSerializer before { actorSystem = ActorSystem("test") @@ -30,6 +34,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } after { + if (store != null) { + store.stop() + } actorSystem.shutdown() actorSystem.awaitTermination() actorSystem = null @@ -49,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("manager-master interaction") { - val store = new BlockManager(master, new KryoSerializer, 2000) + store = new BlockManager(master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -79,7 +86,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -88,47 +95,42 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") - Thread.sleep(100) assert(store.getSingle("a1") === None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") - Thread.sleep(100) assert(store.getSingle("a3") === None, "a3 was in store") } test("in-memory LRU storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER) - Thread.sleep(100) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.getSingle("a1") === None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) - Thread.sleep(100) assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") === None, "a3 was in store") } test("in-memory LRU for partitions of same RDD") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY) - Thread.sleep(100) // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2 // from the same RDD assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") @@ -141,11 +143,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) - Thread.sleep(100) // At this point rdd_1_1 should've replaced rdd_0_1 assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store") assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") @@ -155,7 +156,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT // Put in more partitions from RDD 0; they should replace rdd_1_1 store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY) - Thread.sleep(100) // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped // when we try to add rdd_0_4. assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store") @@ -166,7 +166,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -179,14 +179,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) - Thread.sleep(100) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") @@ -195,14 +194,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) - Thread.sleep(100) assert(store.getLocalBytes("a2") != None, "a2 was not in store") assert(store.getLocalBytes("a3") != None, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") @@ -211,14 +209,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) - Thread.sleep(100) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") @@ -227,14 +224,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) - Thread.sleep(100) assert(store.getLocalBytes("a2") != None, "a2 was not in store") assert(store.getLocalBytes("a3") != None, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") @@ -243,7 +239,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -261,7 +257,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a3") != None, "a1 was not in store") // Now let's add in a4, which uses both disk and memory; a1 should drop out store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) - Thread.sleep(100) assert(store.getSingle("a1") == None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") @@ -269,14 +264,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY) store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY) - Thread.sleep(100) assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) assert(store.get("list3") != None, "list3 was not in store") @@ -286,7 +280,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list2").get.size == 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) - Thread.sleep(100) assert(store.get("list1") != None, "list1 was not in store") assert(store.get("list1").get.size == 2) assert(store.get("list2") != None, "list2 was not in store") @@ -295,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -304,7 +297,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER) store.put("list3", list3.iterator, StorageLevel.DISK_ONLY) - Thread.sleep(100) // At this point LRU should not kick in because list3 is only on disk assert(store.get("list1") != None, "list2 was not in store") assert(store.get("list1").get.size === 2) @@ -320,7 +312,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER) - Thread.sleep(100) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2").get.size === 2) @@ -343,11 +334,68 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - val store = new BlockManager(master, new KryoSerializer, 500) + store = new BlockManager(master, serializer, 500) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") assert(store.getSingle("a2") != None, "a2 was not in store") } + + test("block compression") { + try { + System.setProperty("spark.shuffle.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.shuffle.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") + store.stop() + store = null + + // Check that any other block types are also kept uncompressed + store = new BlockManager(master, serializer, 2000) + store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) + assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") + store.stop() + store = null + } finally { + System.clearProperty("spark.shuffle.compress") + System.clearProperty("spark.broadcast.compress") + System.clearProperty("spark.rdd.compress") + } + } } diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 5aeed99a54..dbe9d085a2 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -42,6 +42,7 @@ <li class="dropdown"> <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a> <ul class="dropdown-menu"> + <li><a href="{{HOME_PATH}}quick-start.html">Quick Start</a></li> <li><a href="{{HOME_PATH}}scala-programming-guide.html">Scala</a></li> <li><a href="{{HOME_PATH}}java-programming-guide.html">Java</a></li> </ul> diff --git a/docs/configuration.md b/docs/configuration.md index fa7123af1b..db90b5bc16 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -113,36 +113,43 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td>spark.blockManager.compress</td> - <td>false</td> + <td>spark.storage.memoryFraction</td> + <td>0.66</td> + <td> + Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" + generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase + it if you configure your own old generation size. + </td> +</tr> +<tr> + <td>spark.shuffle.compress</td> + <td>true</td> <td> - Set to "true" to have Spark compress map output files, RDDs that get cached on disk, - and RDDs that get cached in serialized form. Generally a good idea when dealing with - large datasets, but might add some CPU overhead. + Whether to compress map output files. Generally a good idea. </td> </tr> <tr> <td>spark.broadcast.compress</td> - <td>false</td> + <td>true</td> <td> - Set to "true" to have Spark compress broadcast variables before sending them. - Generally a good idea when broadcasting large values. + Whether to compress broadcast variables before sending them. Generally a good idea. </td> </tr> <tr> - <td>spark.storage.memoryFraction</td> - <td>0.66</td> + <td>spark.rdd.compress</td> + <td>false</td> <td> - Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase - it if you configure your own old generation size. + Whether to compress serialized RDD partitions (e.g. for <code>StorageLevel.MEMORY_ONLY_SER</code>). + Can save substantial space at the cost of some extra CPU time. </td> </tr> <tr> - <td>spark.blockManager.parallelFetches</td> - <td>4</td> + <td>spark.reducer.maxMbInFlight</td> + <td>48</td> <td> - Number of map output files to fetch concurrently from each reduce task. + Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since + each output requires us to create a buffer to receive it, this represents a fixed memory overhead + per reduce task, so keep it small unless you have a large amount of memory. </td> </tr> <tr> @@ -179,10 +186,18 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> + <td>spark.akka.threads</td> + <td>4</td> + <td> + Number of actor threads to use for communication. Can be useful to increase on large clusters + when the master has a lot of CPU cores. + </td> +</tr> +<tr> <td>spark.master.host</td> <td>(local hostname)</td> <td> - Hostname for the master to listen on (it will bind to this hostname's IP address). + Hostname or IP address for the master to listen on. </td> </tr> <tr> diff --git a/docs/css/main.css b/docs/css/main.css index 84fe1d44ed..13fe0b8195 100755 --- a/docs/css/main.css +++ b/docs/css/main.css @@ -73,8 +73,13 @@ a:hover code { * using solution at http://stackoverflow.com/questions/8878033/how- * to-make-twitter-bootstrap-menu-dropdown-on-hover-rather-than-click **/ +.dropdown-menu { + /* Remove the default 2px top margin which causes a small + gap between the hover trigger area and the popup menu */ + margin-top: 0; +} ul.nav li.dropdown:hover ul.dropdown-menu{ - display: block; + display: block; } a.menu:after, .dropdown-toggle:after { content: none; diff --git a/docs/index.md b/docs/index.md index ec6ecf3cc5..5a53da7024 100644 --- a/docs/index.md +++ b/docs/index.md @@ -54,6 +54,10 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`). # Where to Go from Here +**Quick start:** + +* [Spark Quick Start]({{HOME_PATH}}quick-start.html): a quick intro to the Spark API + **Programming guides:** * [Spark Programming Guide]({{HOME_PATH}}scala-programming-guide.html): how to get started using Spark, and details on the Scala API diff --git a/docs/quick-start.md b/docs/quick-start.md new file mode 100644 index 0000000000..f9356afe9a --- /dev/null +++ b/docs/quick-start.md @@ -0,0 +1,239 @@ +--- +layout: global +title: Spark Quick Start +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +# Introduction + +This document provides a quick-and-dirty look at Spark's API. See the [programming guide]({{HOME_PATH}}/scala-programming-guide.html) for a complete reference. To follow along with this guide, you only need to have successfully [built spark]({{HOME_PATH}}) on one machine. Building Spark is as simple as running + +{% highlight bash %} +$ sbt/sbt package +{% endhighlight %} + +from within the Spark directory. + +# Interactive Data Analysis with the Spark Shell + +## Shell basics + +Start the Spark shell by executing `./spark-shell` in the Spark directory. + +Spark's primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDD's can be created from Hadoop InputFormat's (such as HDFS files) or by transforming other RDD's. Let's make a new RDD derived from the text of the README file in the Spark source directory: + +{% highlight scala %} +scala> val textFile = sc.textFile("README.md") +textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 +{% endhighlight %} + +RDD's have _[actions]({{HOME_PATH}}/scala-programming-guide.html#actions)_, which return values, and _[transformations]({{HOME_PATH}}/scala-programming-guide.html#transformations)_, which return pointers to new RDD's. Let's start with a few actions: + +{% highlight scala %} +scala> textFile.count() // Number of items in this RDD +res0: Long = 74 + +scala> textFile.first() // First item in this RDD +res1: String = # Spark +{% endhighlight %} + +Now let's use a transformation. We will use the [filter]({{HOME_PATH}}/scala-programming-guide.html#transformations)() transformation to return a new RDD with a subset of the items in the file. + +{% highlight scala %} +scala> val sparkLinesOnly = textFile.filter(line => line.contains("Spark")) +sparkLinesOnly: spark.RDD[String] = spark.FilteredRDD@7dd4af09 +{% endhighlight %} + +We can chain together transformations and actions: + +{% highlight scala %} +scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? +res3: Long = 15 +{% endhighlight %} + +## Data flow +RDD transformations can be used for more complex computations. Let's say we want to find the line with the most words: + +{% highlight scala %} +scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a < b) {b} else {a}) +res4: Long = 16 +{% endhighlight %} + +This first maps a line to an integer value, creating a new RDD. `reduce` is called on that RDD to find the largest line count. The arguments to [map]({{HOME_PATH}}/scala-programming-guide.html#transformations)() and [reduce]({{HOME_PATH}}/scala-programming-guide.html#actions)() are scala closures. We can easily include functions declared elsewhere, or include existing functions in our anonymous closures. For instance, we can use `Math.max()` to make this code easier to understand. + +{% highlight scala %} +scala> import java.lang.Math; +import java.lang.Math + +scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) +res5: Int = 16 +{% endhighlight %} + +One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily: + +{% highlight scala %} +scala> val wordCountRDD = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((c1, c2) => c1 + c2) +wordCountRDD: spark.RDD[(java.lang.String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 +{% endhighlight %} + +Here, we combined the [flatMap]({{HOME_PATH}}/scala-programming-guide.html#transformations)(), [map]({{HOME_PATH}}/scala-programming-guide.html#transformations)() and [reduceByKey]({{HOME_PATH}}/scala-programming-guide.html#transformations)() transformations to create per-word counts in the file. To collect the word counts in our shell, we can use the [collect]({{HOME_PATH}}/scala-programming-guide.html#actions)() action: + +{% highlight scala %} +scala> wordCountRDD.collect() +res6: Array[(java.lang.String, Int)] = Array((need,2), ("",43), (Extra,3), (using,1), (passed,1), (etc.,1), (its,1), (`/usr/local/lib/libmesos.so`,1), (`SCALA_HOME`,1), (option,1), (these,1), (#,1), (`PATH`,,2), (200,1), (To,3),... +{% endhighlight %} + +## Caching +Spark also supports pulling data sets into a cluster-wide cache. This is very useful when data is accessed iteratively, such as in machine learning jobs, or repeatedly, such as when small "hot data" is queried repeatedly. As a simple example, let's pull part of our file into memory: + + +{% highlight scala %} +scala> val linesWithSparkCached = linesWithSpark.cache() +linesWithSparkCached: spark.RDD[String] = spark.FilteredRDD@17e51082 + +scala> linesWithSparkCached.count() +res7: Long = 15 + +scala> linesWithSparkCached.count() +res8: Long = 15 +{% endhighlight %} + +It may seem silly to use a Spark to explore and cache a 30 line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. + +# A Spark Job in Scala +Now say we wanted to write custom job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, please reference the Spark assembly jar in the developer guide. The first step is to publish Spark to our local Ivy/Maven repositories. From the Spark directory: + +{% highlight bash %} +$ sbt/sbt publish-local +{% endhighlight %} + +Next, we'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.scala`: + +{% highlight scala %} +/*** SimpleJob.scala ***/ +import spark.SparkContext +import SparkContext._ + +object SimpleJob extends Application { + val logFile = "/var/log/syslog" // Should be some log file on your system + val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", + "target/scala-2.9.2/simple-project_2.9.2-1.0.jar") + val logData = sc.textFile(logFile, 2).cache() + val numAs = logData.filter(line => line.contains("a")).count() + val numBs = logData.filter(line => line.contains("b")).count() + println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) +} +{% endhighlight %} + +This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Unlike the earlier examples with the Spark Shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job's sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. + +This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt` which explains that Spark is a dependency: + +{% highlight scala %} +name := "Simple Project" + +version := "1.0" + +scalaVersion := "2.9.2" + +libraryDependencies += "org.spark-project" %% "spark-core" % "0.6.0-SNAPSHOT" +{% endhighlight %} + +Of course, for sbt to work correctly, we'll need to layout `SimpleJob.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a jar package containing the job's code, then use `sbt run` to execute our example job. + +{% highlight bash %} +$ find . +. +./simple.sbt +./src +./src/main +./src/main/scala +./src/main/scala/SimpleJob.scala + +$ sbt clean package +$ sbt run +... +Lines with a: 8422, Lines with b: 1836 +{% endhighlight %} + +This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode]({{HOME_PATH}}/spark-standalone.html) documentation and consider using a distributed input source, such as HDFS. + +# A Spark Job In Java +Now say we wanted to write custom job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, please reference the Spark assembly jar in the developer guide. The first step is to publish Spark to our local Ivy/Maven repositories. From the Spark directory: + +{% highlight bash %} +$ sbt/sbt publish-local +{% endhighlight %} +Next, we'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.java`: + +{% highlight java %} +/*** SimpleJob.java ***/ +import spark.api.java.*; +import spark.api.java.function.Function; + +public class SimpleJob { + public static void main(String[] args) { + String logFile = "/var/log/syslog"; // Should be some log file on your system + JavaSparkContext sc = new JavaSparkContext("local", "Simple Job", + "$YOUR_SPARK_HOME", "target/simple-project-1.0.jar"); + JavaRDD<String> logData = sc.textFile(logFile).cache(); + + long numAs = logData.filter(new Function<String, Boolean>() { + public Boolean call(String s) { return s.contains("a"); } + }).count(); + + long numBs = logData.filter(new Function<String, Boolean>() { + public Boolean call(String s) { return s.contains("b"); } + }).count(); + + System.out.println(String.format( + "Lines with a: %s, Lines with b: %s", numAs, numBs)); + } +} +{% endhighlight %} + +This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Unlike the earlier examples with the Spark Shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job's sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. + +Our Maven `pom.xml` file will list Spark as a dependency. Note that Spark artifacts are tagged with a Scala version. + +{% highlight xml %} +<project> + <groupId>edu.berkeley</groupId> + <artifactId>simple-project</artifactId> + <modelVersion>4.0.0</modelVersion> + <name>Simple Project</name> + <packaging>jar</packaging> + <version>1.0</version> + <dependencies> + <dependency> <!-- Spark dependency --> + <groupId>org.spark-project</groupId> + <artifactId>spark-core_2.9.2</artifactId> + <version>0.6.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> +{% endhighlight %} + +To make Maven happy, we lay out these files according to the canonical directory structure: +{% highlight bash %} +$ find . +./pom.xml +./src +./src/main +./src/main/java +./src/main/java/SimpleJob.java +{% endhighlight %} + +Now, we can execute the job using Maven: + +{% highlight bash %} +$ mvn clean package +$ mvn exec:java -Dexec.mainClass="SimpleJob" +... +Lines with a: 8422, Lines with b: 1836 +{% endhighlight %} + +This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode]({{HOME_PATH}}/spark-standalone.html) documentation and consider using a distributed input source, such as HDFS. + diff --git a/docs/tuning.md b/docs/tuning.md index c90b3156bd..9ce9d4d2ef 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -7,14 +7,19 @@ Because of the in-memory nature of most Spark computations, Spark programs can b by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth, but sometimes, you also need to do some tuning, such as -[storing RDDs in serialized form]({{HOME_PATH}}scala-programming-guide#rdd-persistence), to +[storing RDDs in serialized form]({{HOME_PATH}}/scala-programming-guide.html#rdd-persistence), to make the memory usage smaller. This guide will cover two main topics: data serialization, which is crucial for good network performance, and memory tuning. We also sketch several smaller topics. -# Data Serialization +This document assumes that you have familiarity with the Spark API and have already read the [Scala]({{HOME_PATH}}/scala-programming-guide.html) or [Java]({{HOME_PATH}}/java-programming-guide.html) programming guides. After reading this guide, do not hesitate to reach out to the [Spark mailing list](http://groups.google.com/group/spark-users) with performance related concerns. -One of the most important concerns in any distributed program is the format of data sent across +# The Spark Storage Model +Spark's key abstraction is a distributed dataset, or RDD. RDD's consist of partitions. RDD partitions are stored either in memory or on disk, with replication or without replication, depending on the chosen [persistence options]({{HOME_PATH}}/scala-programming-guide.html#rdd-persistence). When RDD's are stored in memory, they can be stored as deserialized Java objects, or in a serialized form, again depending on the persistence option chosen. When RDD's are transferred over the network, or spilled to disk, they are always serialized. Spark can use different serializers, configurable with the `spark.serializer` option. + +# Serialization Options + +Serialization plays an important role in the performance of Spark applications, especially those which are network-bound. The format of data sent across the network -- formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation. Often, this will be the first thing you should tune to optimize a Spark application. @@ -70,9 +75,12 @@ full class name with each object, which is wasteful. # Memory Tuning There are three considerations in tuning memory usage: the *amount* of memory used by your objects -(you likely want your entire dataset to fit in memory), the *cost* of accessing those objects, and the +(you may want your entire dataset to fit in memory), the *cost* of accessing those objects, and the overhead of *garbage collection* (if you have high turnover in terms of objects). +## Determining memory consumption +The best way to size the amount of memory consumption your dataset will require is to create an RDD, put it into cache, and look at the master logs. The logs will tell you how much memory each partition is consuming, which you can aggregate to get the total size of the RDD. Depending on the object complexity in your dataset, and whether you are storing serialized data, memory overhead can range from 1X (e.g. no overhead vs on-disk storage) to 5X. This guide covers ways to keep memory overhead low, in cases where memory is a contended resource. + ## Efficient Data Structures By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space @@ -101,9 +109,6 @@ There are several ways to reduce this cost and still make Java objects efficient ASCII strings as just 8 bits per character. You can add these options in [`spark-env.sh`]({{HOME_PATH}}configuration.html#environment-variables-in-spark-envsh). -You can get a sense of the memory usage of each object by looking at the logs of your Spark worker -nodes -- they will print the size of each RDD partition cached. - When your objects are still too large to efficiently store despite this tuning, a much simpler way to reduce memory usage is to store them in *serialized* form, using the serialized StorageLevels in the [RDD persistence API]({{HOME_PATH}}scala-programming-guide#rdd-persistence). diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d007d8519e..4184008506 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -80,7 +80,8 @@ object SparkBuild extends Build { "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "cc.spray" % "spray-can" % "1.0-M2.1", - "cc.spray" % "spray-server" % "1.0-M2.1" + "cc.spray" % "spray-server" % "1.0-M2.1", + "org.apache.mesos" % "mesos" % "0.9.0-incubating" ) ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 0b5d439ca4..db78d06d4f 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -30,6 +30,8 @@ class ReplSuite extends FunSuite { spark.repl.Main.interp = null if (interp.sparkContext != null) interp.sparkContext.stop() + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") return out.toString } @@ -21,7 +21,7 @@ fi # If the user specifies a Mesos JAR, put it before our included one on the classpath MESOS_CLASSPATH="" -if [ -z "$MESOS_JAR" ] ; then +if [ -n "$MESOS_JAR" ] ; then MESOS_CLASSPATH="$MESOS_JAR" fi @@ -52,13 +52,12 @@ CLASSPATH="$SPARK_CLASSPATH" CLASSPATH+=":$MESOS_CLASSPATH" CLASSPATH+=":$FWDIR/conf" CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes" -CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" +if [ -n "$SPARK_TESTING" ] ; then + CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" +fi CLASSPATH+=":$CORE_DIR/src/main/resources" CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" -for jar in `find $CORE_DIR/lib -name '*jar'`; do - CLASSPATH+=":$jar" -done for jar in `find $FWDIR/lib_managed/jars -name '*jar'`; do CLASSPATH+=":$jar" done @@ -39,7 +39,6 @@ rem Build up classpath set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes -for /R "%CORE_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j for /R "%FWDIR%\lib_managed\jars" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j for /R "%FWDIR%\lib_managed\bundles" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j for /R "%REPL_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j @@ -65,4 +64,4 @@ if "%SPARK_LAUNCH_WITH_SCALA%" NEQ 1 goto java_runner :run_spark %RUNNER% -cp "%CLASSPATH%" %EXTRA_ARGS% %* -:exit
\ No newline at end of file +:exit @@ -4,4 +4,5 @@ if [ "$MESOS_HOME" != "" ]; then EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java" fi export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd) +export SPARK_TESTING=1 # To put test classes on classpath java -Xmx1200M -XX:MaxPermSize=200m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@" |