diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-05-04 20:41:27 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-05-04 20:41:27 +0530 |
commit | 5b011d18d7fa1b312012574e2fac0e513c33ebd1 (patch) | |
tree | 79e73f0f069b274d615bc7c661d7b8738595c062 /core | |
parent | ea2a6f91d383c958b9bab56858f4ef0c8b6ec847 (diff) | |
parent | 3bf2c868c311f6afd286d7a25adda9fe94561eb0 (diff) | |
download | spark-5b011d18d7fa1b312012574e2fac0e513c33ebd1.tar.gz spark-5b011d18d7fa1b312012574e2fac0e513c33ebd1.tar.bz2 spark-5b011d18d7fa1b312012574e2fac0e513c33ebd1.zip |
Merge from master
Diffstat (limited to 'core')
20 files changed, 452 insertions, 102 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 2987dbbe58..e1fb02157a 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -1,14 +1,19 @@ package spark -import executor.{ShuffleReadMetrics, TaskMetrics} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap +import spark.executor.{ShuffleReadMetrics, TaskMetrics} +import spark.serializer.Serializer import spark.storage.BlockManagerId import spark.util.CompletionIterator + private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { - override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { + + override def fetch[K, V]( + shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer) = { + logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) val blockManager = SparkEnv.get.blockManager @@ -48,8 +53,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } } - val blockFetcherItr = blockManager.getMultiple(blocksByAddress) + val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock) + CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 5eea907322..2af44aa383 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -25,10 +25,12 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * @param shuffleId the shuffle id * @param rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output + * @param serializerClass class name of the serializer to use */ class ShuffleDependency[K, V]( @transient rdd: RDD[(K, V)], - val partitioner: Partitioner) + val partitioner: Partitioner, + val serializerClass: String = null) extends Dependency(rdd) { val shuffleId: Int = rdd.context.newShuffleId() diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 67fd1c1a8f..2b0e697337 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -52,7 +52,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, - mapSideCombine: Boolean = true): RDD[(K, C)] = { + mapSideCombine: Boolean = true, + serializerClass: String = null): RDD[(K, C)] = { if (getKeyClass().isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") @@ -67,13 +68,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( self.mapPartitions(aggregator.combineValuesByKey(_), true) } else if (mapSideCombine) { val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true) - val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner) + val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass) partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true) } else { // Don't apply map-side combiner. // A sanity check to make sure mergeCombiners is not defined. assert(mergeCombiners == null) - val values = new ShuffledRDD[K, V](self, partitioner) + val values = new ShuffledRDD[K, V](self, partitioner, serializerClass) values.mapPartitions(aggregator.combineValuesByKey(_), true) } } @@ -469,7 +470,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /** * Return an RDD with the pairs from `this` whose keys are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ @@ -645,7 +646,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * Return an RDD with the keys of each tuple. */ def keys: RDD[K] = self.map(_._1) - + /** * Return an RDD with the values of each tuple. */ diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala index 442e9f0269..9513a00126 100644 --- a/core/src/main/scala/spark/ShuffleFetcher.scala +++ b/core/src/main/scala/spark/ShuffleFetcher.scala @@ -1,13 +1,16 @@ package spark -import executor.TaskMetrics +import spark.executor.TaskMetrics +import spark.serializer.Serializer + private[spark] abstract class ShuffleFetcher { /** * Fetch the shuffle outputs for a given ShuffleDependency. * @return An iterator over the elements of the fetched shuffle outputs. */ - def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) : Iterator[(K,V)] + def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, + serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[(K,V)] /** Stop the fetcher */ def stop() {} diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 2ee25e547d..be1a04d619 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -3,13 +3,14 @@ package spark import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem} import akka.remote.RemoteActorRefProvider -import serializer.Serializer import spark.broadcast.BroadcastManager import spark.storage.BlockManager import spark.storage.BlockManagerMaster import spark.network.ConnectionManager +import spark.serializer.{Serializer, SerializerManager} import spark.util.AkkaUtils + /** * Holds all the runtime environment objects for a running Spark instance (either master or worker), * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently @@ -20,6 +21,7 @@ import spark.util.AkkaUtils class SparkEnv ( val executorId: String, val actorSystem: ActorSystem, + val serializerManager: SerializerManager, val serializer: Serializer, val closureSerializer: Serializer, val cacheManager: CacheManager, @@ -105,8 +107,14 @@ object SparkEnv extends Logging { Class.forName(name, true, classLoader).newInstance().asInstanceOf[T] } - val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer") - + val serializerManager = new SerializerManager + + val serializer = serializerManager.setDefault( + System.getProperty("spark.serializer", "spark.JavaSerializer")) + + val closureSerializer = serializerManager.get( + System.getProperty("spark.closure.serializer", "spark.JavaSerializer")) + def registerOrLookup(name: String, newActor: => Actor): ActorRef = { if (isDriver) { logInfo("Registering " + name) @@ -130,9 +138,6 @@ object SparkEnv extends Logging { val broadcastManager = new BroadcastManager(isDriver) - val closureSerializer = instantiateClass[Serializer]( - "spark.closure.serializer", "spark.JavaSerializer") - val cacheManager = new CacheManager(blockManager) // Have to assign trackerActor after initialization as MapOutputTrackerActor @@ -167,6 +172,7 @@ object SparkEnv extends Logging { new SparkEnv( executorId, actorSystem, + serializerManager, serializer, closureSerializer, cacheManager, diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index d884529d7a..9b74d1226f 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -182,6 +182,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest) } + /** + * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by + * applying a function to the zipped partitions. Assumes that all the RDDs have the + * *same number of partitions*, but does *not* require them to have the same number + * of elements in each partition. + */ + def zipPartitions[U, V]( + f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V], + other: JavaRDDLike[U, _]): JavaRDD[V] = { + def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator( + f.apply(asJavaIterator(x), asJavaIterator(y)).iterator()) + JavaRDD.fromRDD( + rdd.zipPartitions(fn, other.rdd)(other.classManifest, f.elementType()))(f.elementType()) + } + // Actions (launch a job to return a value to the user program) /** diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala new file mode 100644 index 0000000000..6044043add --- /dev/null +++ b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala @@ -0,0 +1,11 @@ +package spark.api.java.function + +/** + * A function that takes two inputs and returns zero or more output records. + */ +abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] { + @throws(classOf[Exception]) + def call(a: A, b:B) : java.lang.Iterable[C] + + def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]] +} diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index a6235491ca..7599ba1a02 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -54,7 +54,8 @@ private[spark] class CoGroupAggregator class CoGroupedRDD[K]( @transient var rdds: Seq[RDD[(K, _)]], part: Partitioner, - val mapSideCombine: Boolean = true) + val mapSideCombine: Boolean = true, + val serializerClass: String = null) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { private val aggr = new CoGroupAggregator @@ -68,9 +69,9 @@ class CoGroupedRDD[K]( logInfo("Adding shuffle dependency with " + rdd) if (mapSideCombine) { val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true) - new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part) + new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass) } else { - new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part) + new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass) } } } @@ -112,6 +113,7 @@ class CoGroupedRDD[K]( } } + val ser = SparkEnv.get.serializerManager.get(serializerClass) for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent @@ -124,12 +126,12 @@ class CoGroupedRDD[K]( val fetcher = SparkEnv.get.shuffleFetcher if (mapSideCombine) { // With map side combine on, for each key, the shuffle fetcher returns a list of values. - fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics).foreach { + fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach { case (key, values) => getSeq(key)(depNum) ++= values } } else { // With map side combine off, for each key the shuffle fetcher returns a single value. - fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics).foreach { + fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach { case (key, value) => getSeq(key)(depNum) += value } } diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 4e33b7dd5c..c7d1926b83 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -3,6 +3,7 @@ package spark.rdd import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext} import spark.SparkContext._ + private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { override val index = idx override def hashCode(): Int = idx @@ -12,13 +13,15 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { * The resulting RDD from a shuffle (e.g. repartitioning of data). * @param prev the parent RDD. * @param part the partitioner used to partition the RDD + * @param serializerClass class name of the serializer to use. * @tparam K the key class. * @tparam V the value class. */ class ShuffledRDD[K, V]( @transient prev: RDD[(K, V)], - part: Partitioner) - extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) { + part: Partitioner, + serializerClass: String = null) + extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part, serializerClass))) { override val partitioner = Some(part) @@ -28,6 +31,7 @@ class ShuffledRDD[K, V]( override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId - SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics) + SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics, + SparkEnv.get.serializerManager.get(serializerClass)) } } diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 481e03b349..8a9efc5da2 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -12,6 +12,7 @@ import spark.SparkEnv import spark.ShuffleDependency import spark.OneToOneDependency + /** * An optimized version of cogroup for set difference/subtraction. * @@ -31,7 +32,9 @@ import spark.OneToOneDependency private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest]( @transient var rdd1: RDD[(K, V)], @transient var rdd2: RDD[(K, W)], - part: Partitioner) extends RDD[(K, V)](rdd1.context, Nil) { + part: Partitioner, + val serializerClass: String = null) + extends RDD[(K, V)](rdd1.context, Nil) { override def getDependencies: Seq[Dependency[_]] = { Seq(rdd1, rdd2).map { rdd => @@ -40,7 +43,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) - new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part) + new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part, serializerClass) } } } @@ -65,6 +68,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = { val partition = p.asInstanceOf[CoGroupPartition] + val serializer = SparkEnv.get.serializerManager.get(serializerClass) val map = new JHashMap[K, ArrayBuffer[V]] def getSeq(k: K): ArrayBuffer[V] = { val seq = map.get(k) @@ -77,12 +81,16 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM } } def integrate(dep: CoGroupSplitDep, op: ((K, V)) => Unit) = dep match { - case NarrowCoGroupSplitDep(rdd, _, itsSplit) => + case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { for (t <- rdd.iterator(itsSplit, context)) op(t.asInstanceOf[(K, V)]) - case ShuffleCoGroupSplitDep(shuffleId) => - for (t <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index, context.taskMetrics)) + } + case ShuffleCoGroupSplitDep(shuffleId) => { + val iter = SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index, + context.taskMetrics, serializer) + for (t <- iter) op(t.asInstanceOf[(K, V)]) + } } // the first dep is rdd1; add all values to the map integrate(partition.deps(0), t => getSeq(t._1) += t._2) diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 4b36e71c32..95647389c3 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -13,9 +13,10 @@ import com.ning.compress.lzf.LZFInputStream import com.ning.compress.lzf.LZFOutputStream import spark._ -import executor.ShuffleWriteMetrics +import spark.executor.ShuffleWriteMetrics import spark.storage._ -import util.{TimeStampedHashMap, MetadataCleaner} +import spark.util.{TimeStampedHashMap, MetadataCleaner} + private[spark] object ShuffleMapTask { @@ -84,7 +85,7 @@ private[spark] class ShuffleMapTask( protected def this() = this(0, null, null, 0, null) - private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq + @transient private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq { // DEBUG code @@ -128,34 +129,52 @@ private[spark] class ShuffleMapTask( val taskContext = new TaskContext(stageId, partition, attemptId) metrics = Some(taskContext.taskMetrics) + + val blockManager = SparkEnv.get.blockManager + var shuffle: ShuffleBlocks = null + var buckets: ShuffleWriterGroup = null + try { - // Partition the map output. - val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)]) + // Obtain all the block writers for shuffle blocks. + val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) + shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) + buckets = shuffle.acquireWriters(partition) + + // Write the map output to its associated buckets. for (elem <- rdd.iterator(split, taskContext)) { val pair = elem.asInstanceOf[(Any, Any)] val bucketId = dep.partitioner.getPartition(pair._1) - buckets(bucketId) += pair + buckets.writers(bucketId).write(pair) } - val compressedSizes = new Array[Byte](numOutputSplits) - - var totalBytes = 0l - - val blockManager = SparkEnv.get.blockManager - for (i <- 0 until numOutputSplits) { - val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i - // Get a Scala iterator from Java map - val iter: Iterator[(Any, Any)] = buckets(i).iterator - val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false) + // Commit the writes. Get the size of each bucket block (total block size). + var totalBytes = 0L + val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => + writer.commit() + writer.close() + val size = writer.size() totalBytes += size - compressedSizes(i) = MapOutputTracker.compressSize(size) + MapOutputTracker.compressSize(size) } + + // Update shuffle metrics. val shuffleMetrics = new ShuffleWriteMetrics shuffleMetrics.shuffleBytesWritten = totalBytes metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) return new MapStatus(blockManager.blockManagerId, compressedSizes) + } catch { case e: Exception => + // If there is an exception from running the task, revert the partial writes + // and throw the exception upstream to Spark. + if (buckets != null) { + buckets.writers.foreach(_.revertPartialWrites()) + } + throw e } finally { + // Release the writers back to the shuffle block manager. + if (shuffle != null && buckets != null) { + shuffle.releaseWriters(buckets) + } // Execute the callbacks on task completion. taskContext.executeOnCompleteCallbacks() } diff --git a/core/src/main/scala/spark/serializer/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala index aca86ab6f0..2ad73b711d 100644 --- a/core/src/main/scala/spark/serializer/Serializer.scala +++ b/core/src/main/scala/spark/serializer/Serializer.scala @@ -1,10 +1,13 @@ package spark.serializer -import java.nio.ByteBuffer import java.io.{EOFException, InputStream, OutputStream} +import java.nio.ByteBuffer + import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream + import spark.util.ByteBufferInputStream + /** * A serializer. Because some serialization libraries are not thread safe, this class is used to * create [[spark.serializer.SerializerInstance]] objects that do the actual serialization and are @@ -14,6 +17,7 @@ trait Serializer { def newInstance(): SerializerInstance } + /** * An instance of a serializer, for use by one thread at a time. */ @@ -45,6 +49,7 @@ trait SerializerInstance { } } + /** * A stream for writing serialized objects. */ @@ -61,6 +66,7 @@ trait SerializationStream { } } + /** * A stream for reading serialized objects. */ diff --git a/core/src/main/scala/spark/serializer/SerializerManager.scala b/core/src/main/scala/spark/serializer/SerializerManager.scala new file mode 100644 index 0000000000..60b2aac797 --- /dev/null +++ b/core/src/main/scala/spark/serializer/SerializerManager.scala @@ -0,0 +1,45 @@ +package spark.serializer + +import java.util.concurrent.ConcurrentHashMap + + +/** + * A service that returns a serializer object given the serializer's class name. If a previous + * instance of the serializer object has been created, the get method returns that instead of + * creating a new one. + */ +private[spark] class SerializerManager { + + private val serializers = new ConcurrentHashMap[String, Serializer] + private var _default: Serializer = _ + + def default = _default + + def setDefault(clsName: String): Serializer = { + _default = get(clsName) + _default + } + + def get(clsName: String): Serializer = { + if (clsName == null) { + default + } else { + var serializer = serializers.get(clsName) + if (serializer != null) { + // If the serializer has been created previously, reuse that. + serializer + } else this.synchronized { + // Otherwise, create a new one. But make sure no other thread has attempted + // to create another new one at the same time. + serializer = serializers.get(clsName) + if (serializer == null) { + val clsLoader = Thread.currentThread.getContextClassLoader + serializer = + Class.forName(clsName, true, clsLoader).newInstance().asInstanceOf[Serializer] + serializers.put(clsName, serializer) + } + serializer + } + } + } +} diff --git a/core/src/main/scala/spark/storage/BlockException.scala b/core/src/main/scala/spark/storage/BlockException.scala new file mode 100644 index 0000000000..f275d476df --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockException.scala @@ -0,0 +1,5 @@ +package spark.storage + +private[spark] +case class BlockException(blockId: String, message: String) extends Exception(message) + diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 040082e600..72962a3ceb 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -25,15 +25,11 @@ import sun.nio.ch.DirectBuffer private[spark] -case class BlockException(blockId: String, message: String, ex: Exception = null) -extends Exception(message) - -private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, val master: BlockManagerMaster, - val serializer: Serializer, + val defaultSerializer: Serializer, maxMemory: Long) extends Logging { @@ -92,10 +88,12 @@ class BlockManager( } } + val shuffleBlockManager = new ShuffleBlockManager(this) + private val blockInfo = new TimeStampedHashMap[String, BlockInfo] private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore: BlockStore = + private[storage] val diskStore: DiskStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) val connectionManager = new ConnectionManager(0) @@ -282,22 +280,20 @@ class BlockManager( } /** + * A short-circuited method to get blocks directly from disk. This is used for getting + * shuffle blocks. It is safe to do so without a lock on block info since disk store + * never deletes (recent) items. + */ + def getLocalFromDisk(blockId: String, serializer: Serializer): Option[Iterator[Any]] = { + diskStore.getValues(blockId, serializer).orElse( + sys.error("Block " + blockId + " not found on disk, though it should be")) + } + + /** * Get block from local block manager. */ def getLocal(blockId: String): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) - - // As an optimization for map output fetches, if the block is for a shuffle, return it - // without acquiring a lock; the disk store never deletes (recent) items so this should work - if (blockId.startsWith("shuffle_")) { - return diskStore.getValues(blockId) match { - case Some(iterator) => - Some(iterator) - case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") - } - } - val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { @@ -383,7 +379,7 @@ class BlockManager( // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work - if (blockId.startsWith("shuffle_")) { + if (ShuffleBlockManager.isShuffle(blockId)) { return diskStore.getBytes(blockId) match { case Some(bytes) => Some(bytes) @@ -485,9 +481,10 @@ class BlockManager( * fashion as they're received. Expects a size in bytes to be provided for each block fetched, * so that we can control the maxMegabytesInFlight for the fetch. */ - def getMultiple(blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]) + def getMultiple( + blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])], serializer: Serializer) : BlockFetcherIterator = { - return new BlockFetcherIterator(this, blocksByAddress) + return new BlockFetcherIterator(this, blocksByAddress, serializer) } def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) @@ -498,6 +495,22 @@ class BlockManager( } /** + * A short circuited method to get a block writer that can write data directly to disk. + * This is currently used for writing shuffle files out. Callers should handle error + * cases. + */ + def getDiskBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int) + : BlockObjectWriter = { + val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize) + writer.registerCloseEventHandler(() => { + val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) + blockInfo.put(blockId, myInfo) + myInfo.markReady(writer.size()) + }) + writer + } + + /** * Put a new block of values to the block manager. Returns its (estimated) size in bytes. */ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, @@ -596,7 +609,6 @@ class BlockManager( } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) - // Replicate block if required if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis @@ -849,7 +861,7 @@ class BlockManager( } def shouldCompress(blockId: String): Boolean = { - if (blockId.startsWith("shuffle_")) { + if (ShuffleBlockManager.isShuffle(blockId)) { compressShuffle } else if (blockId.startsWith("broadcast_")) { compressBroadcast @@ -864,7 +876,11 @@ class BlockManager( * Wrap an output stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { - if (shouldCompress(blockId)) new LZFOutputStream(s) else s + if (shouldCompress(blockId)) { + (new LZFOutputStream(s)).setFinishBlockOnFlush(true) + } else { + s + } } /** @@ -874,7 +890,10 @@ class BlockManager( if (shouldCompress(blockId)) new LZFInputStream(s) else s } - def dataSerialize(blockId: String, values: Iterator[Any]): ByteBuffer = { + def dataSerialize( + blockId: String, + values: Iterator[Any], + serializer: Serializer = defaultSerializer): ByteBuffer = { val byteStream = new FastByteArrayOutputStream(4096) val ser = serializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() @@ -886,7 +905,10 @@ class BlockManager( * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize(blockId: String, bytes: ByteBuffer): Iterator[Any] = { + def dataDeserialize( + blockId: String, + bytes: ByteBuffer, + serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) serializer.newInstance().deserializeStream(stream).asIterator @@ -980,7 +1002,8 @@ object BlockManager extends Logging { class BlockFetcherIterator( private val blockManager: BlockManager, - val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] + val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])], + serializer: Serializer ) extends Iterator[(String, Option[Iterator[Any]])] with Logging with BlockFetchTracker { import blockManager._ @@ -1043,8 +1066,8 @@ class BlockFetcherIterator( "Unexpected message " + blockMessage.getType + " received from " + cmId) } val blockId = blockMessage.getId - results.put(new FetchResult( - blockId, sizeMap(blockId), () => dataDeserialize(blockId, blockMessage.getData))) + results.put(new FetchResult(blockId, sizeMap(blockId), + () => dataDeserialize(blockId, blockMessage.getData, serializer))) _remoteBytesRead += req.size logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } @@ -1108,7 +1131,7 @@ class BlockFetcherIterator( // any memory that might exceed our maxBytesInFlight startTime = System.currentTimeMillis for (id <- localBlockIds) { - getLocal(id) match { + getLocalFromDisk(id, serializer) match { case Some(iter) => { results.put(new FetchResult(id, 0, () => iter)) // Pass 0 as size since it's not in flight logDebug("Got local block " + id) diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index d2985559c1..15225f93a6 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -19,7 +19,7 @@ import spark.network._ */ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging { initLogging() - + blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive) def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = { @@ -51,7 +51,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends logDebug("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) return None - } + } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) logDebug("Received [" + gB + "]") @@ -90,28 +90,26 @@ private[spark] object BlockManagerWorker extends Logging { private var blockManagerWorker: BlockManagerWorker = null private val DATA_TRANSFER_TIME_OUT_MS: Long = 500 private val REQUEST_RETRY_INTERVAL_MS: Long = 1000 - + initLogging() - + def startBlockManagerWorker(manager: BlockManager) { blockManagerWorker = new BlockManagerWorker(manager) } - + def syncPutBlock(msg: PutBlock, toConnManagerId: ConnectionManagerId): Boolean = { val blockManager = blockManagerWorker.blockManager - val connectionManager = blockManager.connectionManager - val serializer = blockManager.serializer + val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromPutBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val resultMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) return (resultMessage != None) } - + def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = { val blockManager = blockManagerWorker.blockManager - val connectionManager = blockManager.connectionManager - val serializer = blockManager.serializer + val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromGetBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val responseMessage = connectionManager.sendMessageReliablySync( diff --git a/core/src/main/scala/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/spark/storage/BlockObjectWriter.scala new file mode 100644 index 0000000000..42e2b07d5c --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockObjectWriter.scala @@ -0,0 +1,50 @@ +package spark.storage + +import java.nio.ByteBuffer + + +/** + * An interface for writing JVM objects to some underlying storage. This interface allows + * appending data to an existing block, and can guarantee atomicity in the case of faults + * as it allows the caller to revert partial writes. + * + * This interface does not support concurrent writes. + */ +abstract class BlockObjectWriter(val blockId: String) { + + var closeEventHandler: () => Unit = _ + + def open(): BlockObjectWriter + + def close() { + closeEventHandler() + } + + def isOpen: Boolean + + def registerCloseEventHandler(handler: () => Unit) { + closeEventHandler = handler + } + + /** + * Flush the partial writes and commit them as a single atomic block. Return the + * number of bytes written for this commit. + */ + def commit(): Long + + /** + * Reverts writes that haven't been flushed yet. Callers should invoke this function + * when there are runtime exceptions. + */ + def revertPartialWrites() + + /** + * Writes an object. + */ + def write(value: Any) + + /** + * Size of the valid writes, in bytes. + */ + def size(): Long +} diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 215c25132b..8154b8ca74 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -1,18 +1,20 @@ package spark.storage +import java.io.{File, FileOutputStream, OutputStream, RandomAccessFile} import java.nio.ByteBuffer -import java.io.{File, FileOutputStream, RandomAccessFile} +import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode import java.util.{Random, Date} import java.text.SimpleDateFormat -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - import scala.collection.mutable.ArrayBuffer -import spark.executor.ExecutorExitCode +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import spark.Utils +import spark.executor.ExecutorExitCode +import spark.serializer.{Serializer, SerializationStream} + /** * Stores BlockManager blocks on disk. @@ -20,8 +22,59 @@ import spark.Utils private class DiskStore(blockManager: BlockManager, rootDirs: String) extends BlockStore(blockManager) { - private val mapMode = MapMode.READ_ONLY - private var mapOpenMode = "r" + class DiskBlockObjectWriter(blockId: String, serializer: Serializer, bufferSize: Int) + extends BlockObjectWriter(blockId) { + + private val f: File = createFile(blockId /*, allowAppendExisting */) + + // The file channel, used for repositioning / truncating the file. + private var channel: FileChannel = null + private var bs: OutputStream = null + private var objOut: SerializationStream = null + private var lastValidPosition = 0L + + override def open(): DiskBlockObjectWriter = { + val fos = new FileOutputStream(f, true) + channel = fos.getChannel() + bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(fos)) + objOut = serializer.newInstance().serializeStream(bs) + this + } + + override def close() { + objOut.close() + bs.close() + channel = null + bs = null + objOut = null + // Invoke the close callback handler. + super.close() + } + + override def isOpen: Boolean = objOut != null + + // Flush the partial writes, and set valid length to be the length of the entire file. + // Return the number of bytes written for this commit. + override def commit(): Long = { + bs.flush() + val prevPos = lastValidPosition + lastValidPosition = channel.position() + lastValidPosition - prevPos + } + + override def revertPartialWrites() { + // Discard current writes. We do this by flushing the outstanding writes and + // truncate the file to the last valid position. + bs.flush() + channel.truncate(lastValidPosition) + } + + override def write(value: Any) { + objOut.writeObject(value) + } + + override def size(): Long = lastValidPosition + } val MAX_DIR_CREATION_ATTEMPTS: Int = 10 val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt @@ -34,6 +87,11 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) addShutdownHook() + def getBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int) + : BlockObjectWriter = { + new DiskBlockObjectWriter(blockId, serializer, bufferSize) + } + override def getSize(blockId: String): Long = { getFile(blockId).length() } @@ -57,9 +115,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) private def getFileBytes(file: File): ByteBuffer = { val length = file.length() - val channel = new RandomAccessFile(file, mapOpenMode).getChannel() + val channel = new RandomAccessFile(file, "r").getChannel() val buffer = try { - channel.map(mapMode, 0, length) + channel.map(MapMode.READ_ONLY, 0, length) } finally { channel.close() } @@ -79,12 +137,14 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val file = createFile(blockId) val fileOut = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(new FileOutputStream(file))) - val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) + val objOut = blockManager.defaultSerializer.newInstance().serializeStream(fileOut) objOut.writeAll(values.iterator) objOut.close() val length = file.length() + + val timeTaken = System.currentTimeMillis - startTime logDebug("Block %s stored as %s file on disk in %d ms".format( - blockId, Utils.memoryBytesToString(length), (System.currentTimeMillis - startTime))) + blockId, Utils.memoryBytesToString(length), timeTaken)) if (returnValues) { // Return a byte buffer for the contents of the file @@ -105,6 +165,14 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes)) } + /** + * A version of getValues that allows a custom serializer. This is used as part of the + * shuffle short-circuit code. + */ + def getValues(blockId: String, serializer: Serializer): Option[Iterator[Any]] = { + getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) + } + override def remove(blockId: String): Boolean = { val file = getFile(blockId) if (file.exists()) { @@ -118,9 +186,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) getFile(blockId).exists() } - private def createFile(blockId: String): File = { + private def createFile(blockId: String, allowAppendExisting: Boolean = false): File = { val file = getFile(blockId) - if (file.exists()) { + if (!allowAppendExisting && file.exists()) { throw new Exception("File for block " + blockId + " already exists on disk: " + file) } file @@ -186,12 +254,14 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } private def addShutdownHook() { - localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir) ) + localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { override def run() { logDebug("Shutdown hook called") try { - localDirs.foreach(localDir => if (! Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)) + localDirs.foreach { localDir => + if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) + } } catch { case t: Throwable => logError("Exception while deleting local spark dirs", t) } diff --git a/core/src/main/scala/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/spark/storage/ShuffleBlockManager.scala new file mode 100644 index 0000000000..49eabfb0d2 --- /dev/null +++ b/core/src/main/scala/spark/storage/ShuffleBlockManager.scala @@ -0,0 +1,50 @@ +package spark.storage + +import spark.serializer.Serializer + + +private[spark] +class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter]) + + +private[spark] +trait ShuffleBlocks { + def acquireWriters(mapId: Int): ShuffleWriterGroup + def releaseWriters(group: ShuffleWriterGroup) +} + + +private[spark] +class ShuffleBlockManager(blockManager: BlockManager) { + + def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { + new ShuffleBlocks { + // Get a group of writers for a map task. + override def acquireWriters(mapId: Int): ShuffleWriterGroup = { + val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId) + blockManager.getDiskBlockWriter(blockId, serializer, bufferSize).open() + } + new ShuffleWriterGroup(mapId, writers) + } + + override def releaseWriters(group: ShuffleWriterGroup) = { + // Nothing really to release here. + } + } + } +} + + +private[spark] +object ShuffleBlockManager { + + // Returns the block id for a given shuffle block. + def blockId(shuffleId: Int, bucketId: Int, groupId: Int): String = { + "shuffle_" + shuffleId + "_" + groupId + "_" + bucketId + } + + // Returns true if the block is a shuffle block. + def isShuffle(blockId: String): Boolean = blockId.startsWith("shuffle_") +} diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index d3dcd3bbeb..93bb69b41c 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -633,6 +633,32 @@ public class JavaAPISuite implements Serializable { } @Test + public void zipPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); + JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); + FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = + new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() { + @Override + public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) { + int sizeI = 0; + int sizeS = 0; + while (i.hasNext()) { + sizeI += 1; + i.next(); + } + while (s.hasNext()) { + sizeS += 1; + s.next(); + } + return Arrays.asList(sizeI, sizeS); + } + }; + + JavaRDD<Integer> sizes = rdd1.zipPartitions(sizesFn, rdd2); + Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); + } + + @Test public void accumulators() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); |