diff options
12 files changed, 1019 insertions, 90 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 1a2ec55876..c408d5f145 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,7 +17,9 @@ package org.apache.spark -import org.apache.spark.util.AppendOnlyMap +import scala.reflect.ClassTag + +import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap} /** * A set of functions used to aggregate data. @@ -26,35 +28,55 @@ import org.apache.spark.util.AppendOnlyMap * @param mergeValue function to merge a new value into the aggregation result. * @param mergeCombiners function to merge outputs from multiple mergeValue function. */ -case class Aggregator[K, V, C] ( +case class Aggregator[K, V, C: ClassTag] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { - val combiners = new AppendOnlyMap[K, C] - var kv: Product2[K, V] = null - val update = (hadValue: Boolean, oldValue: C) => { - if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) - } - while (iter.hasNext) { - kv = iter.next() - combiners.changeValue(kv._1, update) + val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean + if (!externalSorting) { + val combiners = new AppendOnlyMap[K,C] + var kv: Product2[K, V] = null + val update = (hadValue: Boolean, oldValue: C) => { + if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) + } + while (iter.hasNext) { + kv = iter.next() + combiners.changeValue(kv._1, update) + } + combiners.iterator + } else { + val combiners = + new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) + while (iter.hasNext) { + val kv = iter.next() + combiners.insert(kv._1, kv._2) + } + combiners.iterator } - combiners.iterator } def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { - val combiners = new AppendOnlyMap[K, C] - var kc: (K, C) = null - val update = (hadValue: Boolean, oldValue: C) => { - if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2 + val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean + if (!externalSorting) { + val combiners = new AppendOnlyMap[K,C] + var kc: Product2[K, C] = null + val update = (hadValue: Boolean, oldValue: C) => { + if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2 + } + while (iter.hasNext) { + kc = iter.next() + combiners.changeValue(kc._1, update) + } + combiners.iterator + } else { + val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners) + while (iter.hasNext) { + val kc = iter.next() + combiners.insert(kc._1, kc._2) + } + combiners.iterator } - while (iter.hasNext) { - kc = iter.next() - combiners.changeValue(kc._1, update) - } - combiners.iterator } } - diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 4ba4696fef..44494c7e0a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -23,8 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} -import org.apache.spark.util.AppendOnlyMap - +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} private[spark] sealed trait CoGroupSplitDep extends Serializable @@ -44,14 +43,12 @@ private[spark] case class NarrowCoGroupSplitDep( private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep -private[spark] -class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) +private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) extends Partition with Serializable { override val index: Int = idx override def hashCode(): Int = idx } - /** * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a * tuple with the list of values for that key. @@ -62,6 +59,13 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { + // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs). + // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner. + // CoGroupValue is the intermediate state of each value before being merged in compute. + private type CoGroup = ArrayBuffer[Any] + private type CoGroupValue = (Any, Int) // Int is dependency number + private type CoGroupCombiner = Seq[CoGroup] + private var serializerClass: String = null def setSerializer(cls: String): CoGroupedRDD[K] = { @@ -100,37 +104,75 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) - override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { + override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { + + val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size - // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) - val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] - - val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => { - if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any]) - } - val getSeq = (k: K) => { - map.changeValue(k, update) - } - - val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf) + // A list of (rdd iterator, dependency number) pairs + val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent - rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv => - getSeq(kv._1)(depNum) += kv._2 - } + val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]] + rddIterators += ((it, depNum)) } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher - fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach { - kv => getSeq(kv._1)(depNum) += kv._2 + val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf) + val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser) + rddIterators += ((it, depNum)) + } + } + + if (!externalSorting) { + val map = new AppendOnlyMap[K, CoGroupCombiner] + val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => { + if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup) + } + val getCombiner: K => CoGroupCombiner = key => { + map.changeValue(key, update) + } + rddIterators.foreach { case (it, depNum) => + while (it.hasNext) { + val kv = it.next() + getCombiner(kv._1)(depNum) += kv._2 } } + new InterruptibleIterator(context, map.iterator) + } else { + val map = createExternalMap(numRdds) + rddIterators.foreach { case (it, depNum) => + while (it.hasNext) { + val kv = it.next() + map.insert(kv._1, new CoGroupValue(kv._2, depNum)) + } + } + new InterruptibleIterator(context, map.iterator) } - new InterruptibleIterator(context, map.iterator) + } + + private def createExternalMap(numRdds: Int) + : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { + + val createCombiner: (CoGroupValue => CoGroupCombiner) = value => { + val newCombiner = Array.fill(numRdds)(new CoGroup) + value match { case (v, depNum) => newCombiner(depNum) += v } + newCombiner + } + val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = + (combiner, value) => { + value match { case (v, depNum) => combiner(depNum) += v } + combiner + } + val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = + (combiner1, combiner2) => { + combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 } + } + new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner]( + createCombiner, mergeValue, mergeCombiners) } override def clearDependencies() { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 04a8d05988..7b1759ebbc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -72,7 +72,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, @@ -100,8 +100,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) }, preservesPartitioning = true) } else { // Don't apply map-side combiner. - // A sanity check to make sure mergeCombiners is not defined. - assert(mergeCombiners == null) val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) @@ -112,7 +110,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) /** * Simplified version of combineByKey that hash-partitions the output RDD. */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] = { @@ -268,8 +266,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) // into a hash table, leading to more objects in the old gen. def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false) + createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Seq[V])]] } @@ -339,8 +338,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ - def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) - : RDD[(K, C)] = { + def combineByKey[C: ClassTag]( + createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) + : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 7156d855d8..301d784b35 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -17,12 +17,14 @@ package org.apache.spark.storage +import java.util.UUID + /** * Identifies a particular Block of data, usually associated with a single file. * A Block can be uniquely identified by its filename, but each type of Block has a different * set of keys which produce its unique name. * - * If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method. + * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method. */ private[spark] sealed abstract class BlockId { /** A globally unique identifier for this Block. Can be used for ser/de. */ @@ -55,7 +57,8 @@ private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId { def name = "broadcast_" + broadcastId } -private[spark] case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId { +private[spark] +case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId { def name = broadcastId.name + "_" + hType } @@ -67,6 +70,11 @@ private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends B def name = "input-" + streamId + "-" + uniqueId } +/** Id associated with temporary data managed as blocks. Not serializable. */ +private[spark] case class TempBlockId(id: UUID) extends BlockId { + def name = "temp_" + id +} + // Intended only for testing purposes private[spark] case class TestBlockId(id: String) extends BlockId { def name = "test_" + id diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 55dcb3742c..b98067b8be 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import java.io.File import java.text.SimpleDateFormat -import java.util.{Date, Random} +import java.util.{Date, Random, UUID} import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode @@ -90,6 +90,15 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD def getFile(blockId: BlockId): File = getFile(blockId.name) + /** Produces a unique block id and File suitable for intermediate results. */ + def createTempBlock(): (TempBlockId, File) = { + var blockId = new TempBlockId(UUID.randomUUID()) + while (getFile(blockId).exists()) { + blockId = new TempBlockId(UUID.randomUUID()) + } + (blockId, getFile(blockId)) + } + private def createLocalDirs(): Array[File] = { logDebug("Creating local directories at root dirs '" + rootDirs + "'") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 8bb4ee3bfa..d2a9574a71 100644 --- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -15,7 +15,10 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.util.collection + +import java.util +import java.util.Comparator /** * A simple open hash table optimized for the append-only use case, where keys @@ -45,10 +48,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi private var haveNullValue = false private var nullValue: V = null.asInstanceOf[V] + // Triggered by destructiveSortedIterator; the underlying data array may no longer be used + private var destroyed = false + private val LOAD_FACTOR = 0.7 /** Get the value for a given key */ def apply(key: K): V = { + checkValidityOrThrowException() val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { return nullValue @@ -72,6 +79,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi /** Set the value for a key */ def update(key: K, value: V): Unit = { + checkValidityOrThrowException() val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { if (!haveNullValue) { @@ -106,6 +114,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi * for key, if any, or null otherwise. Returns the newly updated value. */ def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { + checkValidityOrThrowException() val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { if (!haveNullValue) { @@ -139,35 +148,38 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } /** Iterator method from Iterable */ - override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] { - var pos = -1 - - /** Get the next value we should return from next(), or null if we're finished iterating */ - def nextValue(): (K, V) = { - if (pos == -1) { // Treat position -1 as looking at the null value - if (haveNullValue) { - return (null.asInstanceOf[K], nullValue) + override def iterator: Iterator[(K, V)] = { + checkValidityOrThrowException() + new Iterator[(K, V)] { + var pos = -1 + + /** Get the next value we should return from next(), or null if we're finished iterating */ + def nextValue(): (K, V) = { + if (pos == -1) { // Treat position -1 as looking at the null value + if (haveNullValue) { + return (null.asInstanceOf[K], nullValue) + } + pos += 1 } - pos += 1 - } - while (pos < capacity) { - if (!data(2 * pos).eq(null)) { - return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V]) + while (pos < capacity) { + if (!data(2 * pos).eq(null)) { + return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V]) + } + pos += 1 } - pos += 1 + null } - null - } - override def hasNext: Boolean = nextValue() != null + override def hasNext: Boolean = nextValue() != null - override def next(): (K, V) = { - val value = nextValue() - if (value == null) { - throw new NoSuchElementException("End of iterator") + override def next(): (K, V) = { + val value = nextValue() + if (value == null) { + throw new NoSuchElementException("End of iterator") + } + pos += 1 + value } - pos += 1 - value } } @@ -190,7 +202,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } /** Double the table's size and re-hash everything */ - private def growTable() { + protected def growTable() { val newCapacity = capacity * 2 if (newCapacity >= (1 << 30)) { // We can't make the table this big because we want an array of 2x @@ -234,4 +246,52 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi val highBit = Integer.highestOneBit(n) if (highBit == n) n else highBit << 1 } + + /** + * Return an iterator of the map in sorted order. This provides a way to sort the map without + * using additional memory, at the expense of destroying the validity of the map. + */ + def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = { + destroyed = true + // Pack KV pairs into the front of the underlying array + var keyIndex, newIndex = 0 + while (keyIndex < capacity) { + if (data(2 * keyIndex) != null) { + data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1)) + newIndex += 1 + } + keyIndex += 1 + } + assert(curSize == newIndex + (if (haveNullValue) 1 else 0)) + + // Sort by the given ordering + val rawOrdering = new Comparator[AnyRef] { + def compare(x: AnyRef, y: AnyRef): Int = { + cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) + } + } + util.Arrays.sort(data, 0, newIndex, rawOrdering) + + new Iterator[(K, V)] { + var i = 0 + var nullValueReady = haveNullValue + def hasNext: Boolean = (i < newIndex || nullValueReady) + def next(): (K, V) = { + if (nullValueReady) { + nullValueReady = false + (null.asInstanceOf[K], nullValue) + } else { + val item = data(i).asInstanceOf[(K, V)] + i += 1 + item + } + } + } + } + + private def checkValidityOrThrowException(): Unit = { + if (destroyed) { + throw new IllegalStateException("Map state is invalid from destructive sorting!") + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala new file mode 100644 index 0000000000..9e147feec4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import it.unimi.dsi.fastutil.io.FastBufferedInputStream + +import scala.collection.mutable.{ArrayBuffer, PriorityQueue} + +import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} + +/** + * An append-only map that spills sorted content to disk when the memory threshold is exceeded. + * + * This map takes two passes over the data: + * (1) Values are merged into combiners, which are sorted and spilled to disk in as necessary. + * (2) Combiners are read from disk and merged together + * + * Two parameters control the memory threshold: `spark.shuffle.buffer.mb` specifies the maximum + * size of the in-memory map before a spill, and `spark.shuffle.buffer.fraction` specifies an + * additional margin of safety. The second parameter is important for the following reason: + * + * If the spill threshold is set too high, the in-memory map may occupy more memory than is + * available, resulting in OOM. However, if the spill threshold is set too low, we spill + * frequently and incur unnecessary disk writes. This may lead to a performance regression + * compared to the normal case of using the non-spilling AppendOnlyMap. + */ + +private[spark] class ExternalAppendOnlyMap[K, V, C]( + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + serializer: Serializer = SparkEnv.get.serializerManager.default, + diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager) + extends Iterable[(K, C)] with Serializable with Logging { + + import ExternalAppendOnlyMap._ + + private var currentMap = new SizeTrackingAppendOnlyMap[K, C] + private val spilledMaps = new ArrayBuffer[DiskIterator] + private val memoryThresholdMB = { + // TODO: Turn this into a fraction of memory per reducer + val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong + val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat + bufferSize * bufferPercent + } + private val fileBufferSize = + System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + private val comparator = new KCComparator[K, C] + private val ser = serializer.newInstance() + private var spillCount = 0 + + def insert(key: K, value: V): Unit = { + val update: (Boolean, C) => C = (hadVal, oldVal) => { + if (hadVal) mergeValue(oldVal, value) else createCombiner(value) + } + currentMap.changeValue(key, update) + if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) { + spill() + } + } + + private def spill(): Unit = { + spillCount += 1 + logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!") + logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+" so far)") + val (blockId, file) = diskBlockManager.createTempBlock() + val writer = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity) + try { + val it = currentMap.destructiveSortedIterator(comparator) + while (it.hasNext) { + val kv = it.next() + writer.write(kv) + } + writer.commit() + } finally { + // Partial failures cannot be tolerated; do not revert partial writes + writer.close() + } + currentMap = new SizeTrackingAppendOnlyMap[K, C] + spilledMaps.append(new DiskIterator(file)) + } + + override def iterator: Iterator[(K, C)] = { + if (spilledMaps.isEmpty) { + currentMap.iterator + } else { + new ExternalIterator() + } + } + + /** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */ + private class ExternalIterator extends Iterator[(K, C)] { + + // A fixed-size queue that maintains a buffer for each stream we are currently merging + val mergeHeap = new PriorityQueue[StreamBuffer] + + // Input streams are derived both from the in-memory map and spilled maps on disk + // The in-memory map is sorted in place, while the spilled maps are already in sorted order + val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps + + inputStreams.foreach{ it => + val kcPairs = getMorePairs(it) + mergeHeap.enqueue(StreamBuffer(it, kcPairs)) + } + + /** + * Fetch from the given iterator until a key of different hash is retrieved. In the + * event of key hash collisions, this ensures no pairs are hidden from being merged. + */ + def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = { + val kcPairs = new ArrayBuffer[(K, C)] + if (it.hasNext) { + var kc = it.next() + kcPairs += kc + val minHash = kc._1.hashCode() + while (it.hasNext && kc._1.hashCode() == minHash) { + kc = it.next() + kcPairs += kc + } + } + kcPairs + } + + /** + * If the given buffer contains a value for the given key, merge that value into + * baseCombiner and remove the corresponding (K, C) pair from the buffer + */ + def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = { + var i = 0 + while (i < buffer.pairs.size) { + val (k, c) = buffer.pairs(i) + if (k == key) { + buffer.pairs.remove(i) + return mergeCombiners(baseCombiner, c) + } + i += 1 + } + baseCombiner + } + + override def hasNext: Boolean = { + mergeHeap.foreach{ buffer => + if (!buffer.pairs.isEmpty) { + return true + } + } + false + } + + override def next(): (K, C) = { + // Select a return key from the StreamBuffer that holds the lowest key hash + val minBuffer = mergeHeap.dequeue() + val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) + if (minPairs.length == 0) { + // Should only happen when no other stream buffers have any pairs left + throw new NoSuchElementException + } + var (minKey, minCombiner) = minPairs.remove(0) + assert(minKey.hashCode() == minHash) + + // For all other streams that may have this key (i.e. have the same minimum key hash), + // merge in the corresponding value (if any) from that stream + val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer) + while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) { + val newBuffer = mergeHeap.dequeue() + minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer) + mergedBuffers += newBuffer + } + + // Repopulate each visited stream buffer and add it back to the merge heap + mergedBuffers.foreach { buffer => + if (buffer.pairs.length == 0) { + buffer.pairs ++= getMorePairs(buffer.iterator) + } + mergeHeap.enqueue(buffer) + } + + (minKey, minCombiner) + } + + /** + * A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash. + * Each buffer maintains the lowest-ordered keys in the corresponding iterator. Due to + * hash collisions, it is possible for multiple keys to be "tied" for being the lowest. + * + * StreamBuffers are ordered by the minimum key hash found across all of their own pairs. + */ + case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)]) + extends Comparable[StreamBuffer] { + + def minKeyHash: Int = { + if (pairs.length > 0){ + // pairs are already sorted by key hash + pairs(0)._1.hashCode() + } else { + Int.MaxValue + } + } + + override def compareTo(other: StreamBuffer): Int = { + // minus sign because mutable.PriorityQueue dequeues the max, not the min + -minKeyHash.compareTo(other.minKeyHash) + } + } + } + + // Iterate through (K, C) pairs in sorted order from an on-disk map + private class DiskIterator(file: File) extends Iterator[(K, C)] { + val fileStream = new FileInputStream(file) + val bufferedStream = new FastBufferedInputStream(fileStream) + val deserializeStream = ser.deserializeStream(bufferedStream) + var nextItem: Option[(K, C)] = None + var eof = false + + def readNextItem(): Option[(K, C)] = { + if (!eof) { + try { + return Some(deserializeStream.readObject().asInstanceOf[(K, C)]) + } catch { + case e: EOFException => + eof = true + cleanup() + } + } + None + } + + override def hasNext: Boolean = { + nextItem match { + case Some(item) => true + case None => + nextItem = readNextItem() + nextItem.isDefined + } + } + + override def next(): (K, C) = { + nextItem match { + case Some(item) => + nextItem = None + item + case None => + val item = readNextItem() + item.getOrElse(throw new NoSuchElementException) + } + } + + // TODO: Ensure this gets called even if the iterator isn't drained. + def cleanup() { + deserializeStream.close() + file.delete() + } + } +} + +private[spark] object ExternalAppendOnlyMap { + private class KCComparator[K, C] extends Comparator[(K, C)] { + def compare(kc1: (K, C), kc2: (K, C)): Int = { + kc1._1.hashCode().compareTo(kc2._1.hashCode()) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala new file mode 100644 index 0000000000..204330dad4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.util.SizeEstimator +import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample + +/** + * Append-only map that keeps track of its estimated size in bytes. + * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, + * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds). + */ +private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] { + + /** + * Controls the base of the exponential which governs the rate of sampling. + * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements. + */ + private val SAMPLE_GROWTH_RATE = 1.1 + + /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */ + private val samples = new ArrayBuffer[Sample]() + + /** Total number of insertions and updates into the map since the last resetSamples(). */ + private var numUpdates: Long = _ + + /** The value of 'numUpdates' at which we will take our next sample. */ + private var nextSampleNum: Long = _ + + /** The average number of bytes per update between our last two samples. */ + private var bytesPerUpdate: Double = _ + + resetSamples() + + /** Called after the map grows in size, as this can be a dramatic change for small objects. */ + def resetSamples() { + numUpdates = 1 + nextSampleNum = 1 + samples.clear() + takeSample() + } + + override def update(key: K, value: V): Unit = { + super.update(key, value) + numUpdates += 1 + if (nextSampleNum == numUpdates) { takeSample() } + } + + override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { + val newValue = super.changeValue(key, updateFunc) + numUpdates += 1 + if (nextSampleNum == numUpdates) { takeSample() } + newValue + } + + /** Takes a new sample of the current map's size. */ + def takeSample() { + samples += Sample(SizeEstimator.estimate(this), numUpdates) + // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change. + bytesPerUpdate = math.max(0, samples.toSeq.reverse match { + case latest :: previous :: tail => + (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates) + case _ => + 0 + }) + nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong + } + + override protected def growTable() { + super.growTable() + resetSamples() + } + + /** Estimates the current size of the map in bytes. O(1) time. */ + def estimateSize(): Long = { + assert(samples.nonEmpty) + val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates) + (samples.last.size + extrapolatedDelta).toLong + } +} + +private object SizeTrackingAppendOnlyMap { + case class Sample(size: Long, numUpdates: Long) +} diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala new file mode 100644 index 0000000000..93f0c6a8e6 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.util.Random + +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass +import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap} + +class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll { + val NORMAL_ERROR = 0.20 + val HIGH_ERROR = 0.30 + + test("fixed size insertions") { + testWith[Int, Long](10000, i => (i, i.toLong)) + testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong))) + testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass())) + } + + test("variable size insertions") { + val rand = new Random(123456789) + def randString(minLen: Int, maxLen: Int): String = { + "a" * (rand.nextInt(maxLen - minLen) + minLen) + } + testWith[Int, String](10000, i => (i, randString(0, 10))) + testWith[Int, String](10000, i => (i, randString(0, 100))) + testWith[Int, String](10000, i => (i, randString(90, 100))) + } + + test("updates") { + val rand = new Random(123456789) + def randString(minLen: Int, maxLen: Int): String = { + "a" * (rand.nextInt(maxLen - minLen) + minLen) + } + testWith[String, Int](10000, i => (randString(0, 10000), i)) + } + + def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) { + val map = new SizeTrackingAppendOnlyMap[K, V]() + for (i <- 0 until numElements) { + val (k, v) = makeElement(i) + map(k) = v + expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR) + } + } + + def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) { + val betterEstimatedSize = SizeEstimator.estimate(obj) + assert(betterEstimatedSize * (1 - error) < estimatedSize, + s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize") + assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize, + s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize") + } +} + +object SizeTrackingAppendOnlyMapSuite { + // Speed test, for reproducibility of results. + // These could be highly non-deterministic in general, however. + // Results: + // AppendOnlyMap: 31 ms + // SizeTracker: 54 ms + // SizeEstimator: 1500 ms + def main(args: Array[String]) { + val numElements = 100000 + + val baseTimes = for (i <- 0 until 10) yield time { + val map = new AppendOnlyMap[Int, LargeDummyClass]() + for (i <- 0 until numElements) { + map(i) = new LargeDummyClass() + } + } + + val sampledTimes = for (i <- 0 until 10) yield time { + val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]() + for (i <- 0 until numElements) { + map(i) = new LargeDummyClass() + map.estimateSize() + } + } + + val unsampledTimes = for (i <- 0 until 3) yield time { + val map = new AppendOnlyMap[Int, LargeDummyClass]() + for (i <- 0 until numElements) { + map(i) = new LargeDummyClass() + SizeEstimator.estimate(map) + } + } + + println("Base: " + baseTimes) + println("SizeTracker (sampled): " + sampledTimes) + println("SizeEstimator (unsampled): " + unsampledTimes) + } + + def time(f: => Unit): Long = { + val start = System.currentTimeMillis() + f + System.currentTimeMillis() - start + } + + private class LargeDummyClass { + val arr = new Array[Int](100) + } +} diff --git a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala index 7177919a58..71b936b0df 100644 --- a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite +import java.util.Comparator class AppendOnlyMapSuite extends FunSuite { test("initialization") { @@ -151,4 +152,47 @@ class AppendOnlyMapSuite extends FunSuite { assert(map("" + i) === "" + i) } } + + test("destructive sort") { + val map = new AppendOnlyMap[String, String]() + for (i <- 1 to 100) { + map("" + i) = "" + i + } + map.update(null, "happy new year!") + + try { + map.apply("1") + map.update("1", "2013") + map.changeValue("1", (hadValue, oldValue) => "2014") + map.iterator + } catch { + case e: IllegalStateException => fail() + } + + val it = map.destructiveSortedIterator(new Comparator[(String, String)] { + def compare(kv1: (String, String), kv2: (String, String)): Int = { + val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue + val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue + x.compareTo(y) + } + }) + + // Should be sorted by key + assert(it.hasNext) + var previous = it.next() + assert(previous == (null, "happy new year!")) + previous = it.next() + assert(previous == ("1", "2014")) + while (it.hasNext) { + val kv = it.next() + assert(kv._1.toInt > previous._1.toInt) + previous = kv + } + + // All subsequent calls to apply, update, changeValue and iterator should throw exception + intercept[IllegalStateException] { map.apply("1") } + intercept[IllegalStateException] { map.update("1", "2013") } + intercept[IllegalStateException] { map.changeValue("1", (hadValue, oldValue) => "2014") } + intercept[IllegalStateException] { map.iterator } + } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala new file mode 100644 index 0000000000..a18d466baa --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -0,0 +1,232 @@ +package org.apache.spark.util.collection + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.{HashPartitioner, SparkContext, SparkEnv, LocalSparkContext} +import org.apache.spark.SparkContext.rddToPairRDDFunctions + +class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { + + override def beforeEach() { + sc = new SparkContext("local", "test") + System.setProperty("spark.shuffle.externalSorting", "true") + } + + after { + System.setProperty("spark.shuffle.externalSorting", "false") + System.setProperty("spark.shuffle.buffer.mb", "1024") + System.setProperty("spark.shuffle.buffer.fraction", "0.8") + } + + val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i) + val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => { + buffer += i + } + val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] = + (buf1, buf2) => { + buf1 ++= buf2 + } + + test("simple insert") { + var map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, + mergeValue, mergeCombiners) + + // Single insert + map.insert(1, 10) + var it = map.iterator + assert(it.hasNext) + var kv = it.next() + assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10)) + assert(!it.hasNext) + + // Multiple insert + map.insert(2, 20) + map.insert(3, 30) + it = map.iterator + assert(it.hasNext) + assert(it.toSet == Set[(Int, ArrayBuffer[Int])]( + (1, ArrayBuffer[Int](10)), + (2, ArrayBuffer[Int](20)), + (3, ArrayBuffer[Int](30)))) + } + + test("insert with collision") { + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, + mergeValue, mergeCombiners) + + map.insert(1, 10) + map.insert(2, 20) + map.insert(3, 30) + map.insert(1, 100) + map.insert(2, 200) + map.insert(1, 1000) + var it = map.iterator + assert(it.hasNext) + val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet)) + assert(result == Set[(Int, Set[Int])]( + (1, Set[Int](10, 100, 1000)), + (2, Set[Int](20, 200)), + (3, Set[Int](30)))) + } + + test("ordering") { + val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, + mergeValue, mergeCombiners) + map1.insert(1, 10) + map1.insert(2, 20) + map1.insert(3, 30) + + val map2 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, + mergeValue, mergeCombiners) + map2.insert(2, 20) + map2.insert(3, 30) + map2.insert(1, 10) + + val map3 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, + mergeValue, mergeCombiners) + map3.insert(3, 30) + map3.insert(1, 10) + map3.insert(2, 20) + + val it1 = map1.iterator + val it2 = map2.iterator + val it3 = map3.iterator + + var kv1 = it1.next() + var kv2 = it2.next() + var kv3 = it3.next() + assert(kv1._1 == kv2._1 && kv2._1 == kv3._1) + assert(kv1._2 == kv2._2 && kv2._2 == kv3._2) + + kv1 = it1.next() + kv2 = it2.next() + kv3 = it3.next() + assert(kv1._1 == kv2._1 && kv2._1 == kv3._1) + assert(kv1._2 == kv2._2 && kv2._2 == kv3._2) + + kv1 = it1.next() + kv2 = it2.next() + kv3 = it3.next() + assert(kv1._1 == kv2._1 && kv2._1 == kv3._1) + assert(kv1._2 == kv2._2 && kv2._2 == kv3._2) + } + + test("null keys and values") { + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, + mergeValue, mergeCombiners) + map.insert(1, 5) + map.insert(2, 6) + map.insert(3, 7) + assert(map.size === 3) + assert(map.iterator.toSet == Set[(Int, Seq[Int])]( + (1, Seq[Int](5)), + (2, Seq[Int](6)), + (3, Seq[Int](7)) + )) + + // Null keys + val nullInt = null.asInstanceOf[Int] + map.insert(nullInt, 8) + assert(map.size === 4) + assert(map.iterator.toSet == Set[(Int, Seq[Int])]( + (1, Seq[Int](5)), + (2, Seq[Int](6)), + (3, Seq[Int](7)), + (nullInt, Seq[Int](8)) + )) + + // Null values + map.insert(4, nullInt) + map.insert(nullInt, nullInt) + assert(map.size === 5) + val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet)) + assert(result == Set[(Int, Set[Int])]( + (1, Set[Int](5)), + (2, Set[Int](6)), + (3, Set[Int](7)), + (4, Set[Int](nullInt)), + (nullInt, Set[Int](nullInt, 8)) + )) + } + + test("simple aggregator") { + // reduceByKey + val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1)) + val result1 = rdd.reduceByKey(_+_).collect() + assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5))) + + // groupByKey + val result2 = rdd.groupByKey().collect() + assert(result2.toSet == Set[(Int, Seq[Int])] + ((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1)))) + } + + test("simple cogroup") { + val rdd1 = sc.parallelize(1 to 4).map(i => (i, i)) + val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i)) + val result = rdd1.cogroup(rdd2).collect() + + result.foreach { case (i, (seq1, seq2)) => + i match { + case 0 => assert(seq1.toSet == Set[Int]() && seq2.toSet == Set[Int](2, 4)) + case 1 => assert(seq1.toSet == Set[Int](1) && seq2.toSet == Set[Int](1, 3)) + case 2 => assert(seq1.toSet == Set[Int](2) && seq2.toSet == Set[Int]()) + case 3 => assert(seq1.toSet == Set[Int](3) && seq2.toSet == Set[Int]()) + case 4 => assert(seq1.toSet == Set[Int](4) && seq2.toSet == Set[Int]()) + } + } + } + + test("spilling") { + System.setProperty("spark.shuffle.buffer.mb", "1") + System.setProperty("spark.shuffle.buffer.fraction", "0.05") + + // reduceByKey - should spill exactly 6 times + val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i)) + val resultA = rddA.reduceByKey(math.max(_, _)).collect() + assert(resultA.length == 5000) + resultA.foreach { case(k, v) => + k match { + case 0 => assert(v == 1) + case 2500 => assert(v == 5001) + case 4999 => assert(v == 9999) + case _ => + } + } + + // groupByKey - should spill exactly 11 times + val rddB = sc.parallelize(0 until 10000).map(i => (i/4, i)) + val resultB = rddB.groupByKey().collect() + assert(resultB.length == 2500) + resultB.foreach { case(i, seq) => + i match { + case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3)) + case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003)) + case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999)) + case _ => + } + } + + // cogroup - should spill exactly 7 times + val rddC1 = sc.parallelize(0 until 1000).map(i => (i, i)) + val rddC2 = sc.parallelize(0 until 1000).map(i => (i%100, i)) + val resultC = rddC1.cogroup(rddC2).collect() + assert(resultC.length == 1000) + resultC.foreach { case(i, (seq1, seq2)) => + i match { + case 0 => + assert(seq1.toSet == Set[Int](0)) + assert(seq2.toSet == Set[Int](0, 100, 200, 300, 400, 500, 600, 700, 800, 900)) + case 500 => + assert(seq1.toSet == Set[Int](500)) + assert(seq2.toSet == Set[Int]()) + case 999 => + assert(seq1.toSet == Set[Int](999)) + assert(seq2.toSet == Set[Int]()) + case _ => + } + } + } +} diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index d53d433693..354f04f9a5 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -20,6 +20,7 @@ package org.apache.spark.streaming; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.Files; import kafka.serializer.StringDecoder; @@ -477,13 +478,13 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, String>("new york", "islanders"))); - List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( - Arrays.asList( + List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Sets.newHashSet( new Tuple2<String, Tuple2<String, String>>("california", new Tuple2<String, String>("dodgers", "giants")), new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("yankees", "mets"))), - Arrays.asList( + new Tuple2<String, String>("yankees", "mets"))), + Sets.newHashSet( new Tuple2<String, Tuple2<String, String>>("california", new Tuple2<String, String>("sharks", "ducks")), new Tuple2<String, Tuple2<String, String>>("new york", @@ -518,8 +519,12 @@ public class JavaAPISuite implements Serializable { JavaTestUtils.attachTestOutputStream(joined); List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + for (List<Tuple2<String, Tuple2<String, String>>> res: result) { + unorderedResult.add(Sets.newHashSet(res)); + } - Assert.assertEquals(expected, result); + Assert.assertEquals(expected, unorderedResult); } @@ -1232,15 +1237,15 @@ public class JavaAPISuite implements Serializable { Arrays.asList("hello", "moon"), Arrays.asList("hello")); - List<List<Tuple2<String, Long>>> expected = Arrays.asList( - Arrays.asList( + List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList( + Sets.newHashSet( new Tuple2<String, Long>("hello", 1L), new Tuple2<String, Long>("world", 1L)), - Arrays.asList( + Sets.newHashSet( new Tuple2<String, Long>("hello", 2L), new Tuple2<String, Long>("world", 1L), new Tuple2<String, Long>("moon", 1L)), - Arrays.asList( + Sets.newHashSet( new Tuple2<String, Long>("hello", 2L), new Tuple2<String, Long>("moon", 1L))); @@ -1250,8 +1255,12 @@ public class JavaAPISuite implements Serializable { stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList(); + for (List<Tuple2<String, Long>> res: result) { + unorderedResult.add(Sets.newHashSet(res)); + } - Assert.assertEquals(expected, result); + Assert.assertEquals(expected, unorderedResult); } @Test |