From 1ffe26c7c03b5d128952e7d3ea7f130cd242a468 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 26 Dec 2013 16:19:25 -0800 Subject: Fix streaming JavaAPISuite that depended on order --- .../org/apache/spark/streaming/JavaAPISuite.java | 27 +++++++++++++--------- 1 file changed, 16 insertions(+), 11 deletions(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index daeb99f5b7..bf23469936 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -20,6 +20,7 @@ package org.apache.spark.streaming; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.Files; import kafka.serializer.StringDecoder; @@ -473,13 +474,13 @@ public class JavaAPISuite implements Serializable { new Tuple2("new york", "islanders"))); - List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2>("california", - new Tuple2("dodgers", "giants")), - new Tuple2>("new york", - new Tuple2("yankees", "mets"))), - Arrays.asList( + List>>> expected = Arrays.asList( + Sets.newHashSet( + new Tuple2>("california", + new Tuple2("dodgers", "giants")), + new Tuple2>("new york", + new Tuple2("yankees", "mets"))), + Sets.newHashSet( new Tuple2>("california", new Tuple2("sharks", "ducks")), new Tuple2>("new york", @@ -514,8 +515,12 @@ public class JavaAPISuite implements Serializable { JavaTestUtils.attachTestOutputStream(joined); List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List>>> unorderedResult = Lists.newArrayList(); + for (List>> res: result) { + unorderedResult.add(Sets.newHashSet(res)); + } - Assert.assertEquals(expected, result); + Assert.assertEquals(expected, unorderedResult); } @@ -1230,11 +1235,11 @@ public class JavaAPISuite implements Serializable { List>> expected = Arrays.asList( Arrays.asList( - new Tuple2("hello", 1L), - new Tuple2("world", 1L)), + new Tuple2("world", 1L), + new Tuple2("hello", 1L)), Arrays.asList( - new Tuple2("hello", 2L), new Tuple2("world", 1L), + new Tuple2("hello", 2L), new Tuple2("moon", 1L)), Arrays.asList( new Tuple2("hello", 2L), -- cgit v1.2.3 From a515706d9cb2c94ed981d9015026331aaf582f36 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 26 Dec 2013 17:42:13 -0800 Subject: Fix streaming JavaAPISuite again --- .../org/apache/spark/streaming/JavaAPISuite.java | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index bf23469936..947668369f 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1233,15 +1233,15 @@ public class JavaAPISuite implements Serializable { Arrays.asList("hello", "moon"), Arrays.asList("hello")); - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("world", 1L), - new Tuple2("hello", 1L)), - Arrays.asList( - new Tuple2("world", 1L), + List>> expected = Arrays.asList( + Sets.newHashSet( + new Tuple2("hello", 1L), + new Tuple2("world", 1L)), + Sets.newHashSet( new Tuple2("hello", 2L), + new Tuple2("world", 1L), new Tuple2("moon", 1L)), - Arrays.asList( + Sets.newHashSet( new Tuple2("hello", 2L), new Tuple2("moon", 1L))); @@ -1251,8 +1251,12 @@ public class JavaAPISuite implements Serializable { stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + List>> unorderedResult = Lists.newArrayList(); + for (List> res: result) { + unorderedResult.add(Sets.newHashSet(res)); + } - Assert.assertEquals(expected, result); + Assert.assertEquals(expected, unorderedResult); } @Test -- cgit v1.2.3 From 8fbff9f5d04064b870e372db0e3885e3fbf28222 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 29 Dec 2013 16:22:44 -0800 Subject: Address Aaron's comments --- .../main/scala/org/apache/spark/Aggregator.scala | 2 +- .../scala/org/apache/spark/rdd/CoGroupedRDD.scala | 31 +-- .../spark/util/collection/AppendOnlyMap.scala | 14 +- .../util/collection/ExternalAppendOnlyMap.scala | 221 +++++++++++++++------ .../org/apache/spark/streaming/JavaAPISuite.java | 8 +- 5 files changed, 188 insertions(+), 88 deletions(-) (limited to 'streaming/src/test') diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 470694ed35..784c09ec51 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -74,7 +74,7 @@ case class Aggregator[K, V, C: ClassTag] ( val combiners = new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners) while (iter.hasNext) { - var kc = iter.next() + val kc = iter.next() combiners.insert(kc._1, kc._2) } combiners.iterator diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index dd02c8a3fe..77a594a3e4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -43,8 +43,7 @@ private[spark] case class NarrowCoGroupSplitDep( private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep -private[spark] -class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) +private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) extends Partition with Serializable { override val index: Int = idx override def hashCode(): Int = idx @@ -60,6 +59,9 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { + // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs). + // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner. + // CoGroupValue is the intermediate state of each value before being merged in compute. private type CoGroup = ArrayBuffer[Any] private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Seq[CoGroup] @@ -103,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) + val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size @@ -113,27 +115,30 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent - val v = (rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]], depNum) - rddIterators += v + val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]] + rddIterators += ((it, depNum)) } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher val ser = SparkEnv.get.serializerManager.get(serializerClass) - val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum) - rddIterators += v + val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser) + rddIterators += ((it, depNum)) } } if (!externalSorting) { val map = new AppendOnlyMap[K, CoGroupCombiner] val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => { - if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any]) + if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup) + } + val getCombiner: K => CoGroupCombiner = key => { + map.changeValue(key, update) } rddIterators.foreach { case (it, depNum) => while (it.hasNext) { val kv = it.next() - map.changeValue(kv._1, update)(depNum) += kv._2 + getCombiner(kv._1)(depNum) += kv._2 } } new InterruptibleIterator(context, map.iterator) @@ -149,17 +154,17 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: } } - private def createExternalMap(numRdds: Int): - ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { + private def createExternalMap(numRdds: Int) + : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { val createCombiner: (CoGroupValue => CoGroupCombiner) = value => { val newCombiner = Array.fill(numRdds)(new CoGroup) - value match { case(v, depNum) => newCombiner(depNum) += v } + value match { case (v, depNum) => newCombiner(depNum) += v } newCombiner } val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (combiner, value) => { - value match { case(v, depNum) => combiner(depNum) += v } + value match { case (v, depNum) => combiner(depNum) += v } combiner } val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 38f3c556ae..7810119847 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -237,10 +237,12 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi if (highBit == n) n else highBit << 1 } - // Return an iterator of the map in sorted order. - // Note that the validity of the map is no longer preserved. - def destructiveSortedIterator(ord: Ordering[(K, V)]): Iterator[(K, V)] = { + /** Return an iterator of the map in sorted order. This provides a way to sort the map without + * using additional memory, at the expense of destroying the validity of the map. + */ + def destructiveSortedIterator(ordering: Ordering[(K, V)]): Iterator[(K, V)] = { var keyIndex, newIndex = 0 + // Pack KV pairs into the front of the underlying array while (keyIndex < capacity) { if (data(2 * keyIndex) != null) { data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1)) @@ -248,11 +250,11 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } keyIndex += 1 } - // sort assert(newIndex == curSize) + // Sort by the given ordering val rawOrdering = new Ordering[AnyRef] { - def compare(x: AnyRef, y: AnyRef): Int ={ - ord.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) + def compare(x: AnyRef, y: AnyRef): Int = { + ordering.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) } } util.Arrays.sort(data, 0, curSize, rawOrdering) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index f5a2e8db59..1de545c05b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -33,6 +33,10 @@ import org.apache.spark.serializer.Serializer * * (2) Otherwise, group values of the same key together before disk spill, and merge them * into combiners only after reading them back from disk. + * + * In the latter case, values occupy much more space because they are not collapsed as soon + * as they are inserted. This in turn leads to more disk spills, degrading performance. + * For this reason, a mergeCombiners function should be specified if possible. */ private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag]( createCombiner: V => C, @@ -78,28 +82,42 @@ private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag]( /** * An append-only map that spills sorted content to disk when the memory threshold is exceeded. - * A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V]. + * A group is an intermediate combiner, with type G equal to either C or ArrayBuffer[V]. + * + * This map takes two passes over the data: + * (1) Values are merged into groups, which are spilled to disk as necessary. + * (2) Groups are read from disk and merged into combiners, which are returned. + * + * If we never spill to disk, we avoid the second pass provided that groups G are already + * combiners C. + * + * Note that OOM is still possible with the SpillableAppendOnlyMap. This may occur if the + * collective G values do not fit into memory, or if the size estimation is not sufficiently + * accurate. To account for the latter, `spark.shuffle.buffer.fraction` specifies an additional + * margin of safety, while `spark.shuffle.buffer.mb` specifies the raw memory threshold. */ -private[spark] class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag]( - createGroup: V => M, - mergeValue: (M, V) => M, - mergeGroups: (M, M) => M, - createCombiner: M => C, +private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( + createGroup: V => G, + mergeValue: (G, V) => G, + mergeGroups: (G, G) => G, + createCombiner: G => C, serializer: Serializer) extends Iterable[(K, C)] with Serializable { - private var currentMap = new SizeTrackingAppendOnlyMap[K, M] - private val oldMaps = new ArrayBuffer[DiskIterator] + import SpillableAppendOnlyMap._ + + private var currentMap = new SizeTrackingAppendOnlyMap[K, G] + private val oldMaps = new ArrayBuffer[DiskKGIterator] private val memoryThreshold = { val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong * 1024 * 1024 val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat bufferSize * bufferPercent } - private val ordering = new SpillableAppendOnlyMap.KeyHashOrdering[K, M]() + private val ordering = new KeyGroupOrdering[K, G] private val ser = serializer.newInstance() def insert(key: K, value: V): Unit = { - val update: (Boolean, M) => M = (hadVal, oldVal) => { + val update: (Boolean, G) => G = (hadVal, oldVal) => { if (hadVal) mergeValue(oldVal, value) else createGroup(value) } currentMap.changeValue(key, update) @@ -117,98 +135,173 @@ private[spark] class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag]( out.writeObject(kv) } out.close() - currentMap = new SizeTrackingAppendOnlyMap[K, M] - oldMaps.append(new DiskIterator(file)) + currentMap = new SizeTrackingAppendOnlyMap[K, G] + oldMaps.append(new DiskKGIterator(file)) } override def iterator: Iterator[(K, C)] = { - if (oldMaps.isEmpty && implicitly[ClassTag[M]] == implicitly[ClassTag[C]]) { + if (oldMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) { currentMap.iterator.asInstanceOf[Iterator[(K, C)]] } else { new ExternalIterator() } } - // An iterator that sort-merges (K, M) pairs from memory and disk into (K, C) pairs + // An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs private class ExternalIterator extends Iterator[(K, C)] { - val pq = new PriorityQueue[KMITuple] - val inputStreams = Seq(currentMap.destructiveSortedIterator(ordering)) ++ oldMaps - inputStreams.foreach(readFromIterator) - - // Read from the given iterator until a key of different hash is retrieved - def readFromIterator(it: Iterator[(K, M)]): Unit = { - var minHash : Option[Int] = None - while (it.hasNext) { - val (k, m) = it.next() - pq.enqueue(KMITuple(k, m, it)) - minHash match { - case None => minHash = Some(k.hashCode()) - case Some(expectedHash) => - if (k.hashCode() != expectedHash) { - return - } + val mergeHeap = new PriorityQueue[KGITuple] + val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(ordering)) + + // Invariant: size of mergeHeap == number of input streams + inputStreams.foreach{ it => + val kgPairs = readFromIterator(it) + mergeHeap.enqueue(KGITuple(it, kgPairs)) + } + + // Read from the given iterator until a key of different hash is retrieved. + // The resulting ArrayBuffer includes this key, and is ordered by key hash. + def readFromIterator(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = { + val kgPairs = new ArrayBuffer[(K, G)] + if (it.hasNext) { + var kg = it.next() + kgPairs += kg + val minHash = kg._1.hashCode() + while (it.hasNext && kg._1.hashCode() == minHash) { + kg = it.next() + kgPairs += kg + } + } + kgPairs + } + + // Drop and return all (K, G) pairs with K = the given key from the given KGITuple + def dropKey(kgi: KGITuple, key: K): ArrayBuffer[(K, G)] = { + val dropped = new ArrayBuffer[(K, G)] + var i = 0 + while (i < kgi.pairs.length) { + if (kgi.pairs(i)._1 == key) { + dropped += kgi.pairs.remove(i) + } else { + i += 1 } } + dropped } - override def hasNext: Boolean = !pq.isEmpty + // Merge all (K, G) pairs with K = the given key into baseGroup + def mergeIntoGroup(key: K, baseGroup: G, kgPairs: ArrayBuffer[(K, G)]): G = { + var mergedGroup = baseGroup + kgPairs.foreach { case (k, g) => + if (k == key){ + mergedGroup = mergeGroups(mergedGroup, g) + } + } + mergedGroup + } + + override def hasNext: Boolean = { + mergeHeap.foreach{ kgi => + if (!kgi.pairs.isEmpty) { + return true + } + } + false + } override def next(): (K, C) = { - val minKMI = pq.dequeue() - var (minKey, minGroup) = (minKMI.key, minKMI.group) - val minHash = minKey.hashCode() - readFromIterator(minKMI.iterator) - - // Merge groups with the same key into minGroup - var collidedKMI = ArrayBuffer[KMITuple]() - while (!pq.isEmpty && pq.head.key.hashCode() == minHash) { - val newKMI = pq.dequeue() - if (newKMI.key == minKey) { - minGroup = mergeGroups(minGroup, newKMI.group) - readFromIterator(newKMI.iterator) - } else { - // Collision - collidedKMI += newKMI + var minKGI = mergeHeap.dequeue() + val (minPairs, minHash) = (minKGI.pairs, minKGI.minHash) + if (minPairs.length == 0) { + // Should only happen when hasNext is false + throw new NoSuchElementException + } + var (minKey, minGroup) = minPairs(0) + assert(minKey.hashCode() == minHash) + + // Merge the rest of minPairs into minGroup + val minPairsWithKey = dropKey(minKGI, minKey).tail + minGroup = mergeIntoGroup(minKey, minGroup, minPairsWithKey) + if (minPairs.length == 0) { + minPairs ++= readFromIterator(minKGI.iterator) + } + + // Do the same for all other KGITuples with the same minHash + val tuplesToAddBack = ArrayBuffer[KGITuple](minKGI) + while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) { + val newKGI = mergeHeap.dequeue() + val pairsWithKey = dropKey(newKGI, minKey) + minGroup = mergeIntoGroup(minKey, minGroup, pairsWithKey) + if (newKGI.pairs.length == 0) { + newKGI.pairs ++= readFromIterator(newKGI.iterator) } + tuplesToAddBack += newKGI } - collidedKMI.foreach(pq.enqueue(_)) + tuplesToAddBack.foreach(mergeHeap.enqueue(_)) (minKey, createCombiner(minGroup)) } - case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)]) extends Ordered[KMITuple] { - def compare(other: KMITuple): Int = { - other.key.hashCode().compareTo(key.hashCode()) + case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)]) + extends Ordered[KGITuple] { + + // Invariant: pairs are ordered by key hash + def minHash: Int = { + if (pairs.length > 0){ + pairs(0)._1.hashCode() + } else { + Int.MaxValue + } + } + + def compare(other: KGITuple): Int = { + // mutable.PriorityQueue dequeues the max, not the min + -minHash.compareTo(other.minHash) } } } - // Iterate through (K, M) pairs in sorted order from an on-disk map - private class DiskIterator(file: File) extends Iterator[(K, M)] { + // Iterate through (K, G) pairs in sorted order from an on-disk map + private class DiskKGIterator(file: File) extends Iterator[(K, G)] { val in = ser.deserializeStream(new FileInputStream(file)) - var nextItem: Option[(K, M)] = None + var nextItem: Option[(K, G)] = None + var eof = false + + def readNextItem(): Option[(K, G)] = { + if (!eof) { + try { + return Some(in.readObject().asInstanceOf[(K, G)]) + } catch { + case e: EOFException => eof = true + } + } + None + } override def hasNext: Boolean = { - nextItem = try { - Some(in.readObject().asInstanceOf[(K, M)]) - } catch { - case e: EOFException => None + nextItem match { + case Some(item) => true + case None => + nextItem = readNextItem() + nextItem.isDefined } - nextItem.isDefined } - override def next(): (K, M) = { + override def next(): (K, G) = { nextItem match { - case Some(item) => item - case None => throw new NoSuchElementException + case Some(item) => + nextItem = None + item + case None => + val item = readNextItem() + item.getOrElse(throw new NoSuchElementException) } } } } private[spark] object SpillableAppendOnlyMap { - private class KeyHashOrdering[K, M] extends Ordering[(K, M)] { - def compare(x: (K, M), y: (K, M)): Int = { - x._1.hashCode().compareTo(y._1.hashCode()) + private class KeyGroupOrdering[K, G] extends Ordering[(K, G)] { + def compare(kg1: (K, G), kg2: (K, G)): Int = { + kg1._1.hashCode().compareTo(kg2._1.hashCode()) } } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 947668369f..a0a8129948 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -476,10 +476,10 @@ public class JavaAPISuite implements Serializable { List>>> expected = Arrays.asList( Sets.newHashSet( - new Tuple2>("california", - new Tuple2("dodgers", "giants")), - new Tuple2>("new york", - new Tuple2("yankees", "mets"))), + new Tuple2>("california", + new Tuple2("dodgers", "giants")), + new Tuple2>("new york", + new Tuple2("yankees", "mets"))), Sets.newHashSet( new Tuple2>("california", new Tuple2("sharks", "ducks")), -- cgit v1.2.3 From 372a533a6c091361115f0f0712e93ef3af376b30 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 9 Jan 2014 21:47:49 -0800 Subject: Fix wonky imports from merge --- .../src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index be93799a2a..8b7d7709bf 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,14 +17,6 @@ package org.apache.spark.streaming; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.io.Files; - -import kafka.serializer.StringDecoder; - import scala.Tuple2; import org.junit.After; @@ -36,6 +28,7 @@ import java.util.*; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.io.Files; +import com.google.common.collect.Sets; import org.apache.spark.SparkConf; import org.apache.spark.HashPartitioner; -- cgit v1.2.3 From 942c80b34c4642de3b0237761bc1aaeb8cbdd546 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 10 Jan 2014 16:32:36 -0800 Subject: Fix one unit test that was not setting spark.cleaner.ttl --- .../test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index ee6b433d1f..5ccef7f461 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -375,7 +375,7 @@ class BasicOperationsSuite extends TestSuiteBase { } test("slice") { - val conf2 = new SparkConf() + val conf2 = conf.clone() .setMaster("local[2]") .setAppName("BasicOperationsSuite") .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") -- cgit v1.2.3 From 22d4d62420908ba3451409fa7c72cd26a009a858 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 Jan 2014 16:07:03 -0800 Subject: Revert "Fix one unit test that was not setting spark.cleaner.ttl" This reverts commit 942c80b34c4642de3b0237761bc1aaeb8cbdd546. --- .../test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 5ccef7f461..ee6b433d1f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -375,7 +375,7 @@ class BasicOperationsSuite extends TestSuiteBase { } test("slice") { - val conf2 = conf.clone() + val conf2 = new SparkConf() .setMaster("local[2]") .setAppName("BasicOperationsSuite") .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") -- cgit v1.2.3