aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-11 23:40:57 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-11 23:40:57 -0800
commit18f4889d96b61b59569ec05f64900da1477404d0 (patch)
tree3dcb95406babf79d7234aa3851366d5b162dde72
parent4d9b0ab420df383869fa586b229ac00f234b8749 (diff)
parent288a878999848adb130041d1e40c14bfc879cec6 (diff)
downloadspark-18f4889d96b61b59569ec05f64900da1477404d0.tar.gz
spark-18f4889d96b61b59569ec05f64900da1477404d0.tar.bz2
spark-18f4889d96b61b59569ec05f64900da1477404d0.zip
Merge remote-tracking branch 'apache/master' into error-handling
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala61
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala100
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala88
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/util/Vector.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala (renamed from core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala)116
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala350
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala101
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala120
-rw-r--r--core/src/test/scala/org/apache/spark/util/VectorSuite.scala44
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala)46
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala230
-rw-r--r--docs/configuration.md23
-rwxr-xr-xec2/spark_ec2.py12
-rw-r--r--external/kafka/pom.xml4
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala15
-rw-r--r--project/SparkBuild.scala18
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java29
30 files changed, 1347 insertions, 174 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 1a2ec55876..8b30cd4bfe 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import org.apache.spark.util.AppendOnlyMap
+import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
/**
* A set of functions used to aggregate data.
@@ -31,30 +31,51 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
+ private val sparkConf = SparkEnv.get.conf
+ private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
+
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
- val combiners = new AppendOnlyMap[K, C]
- var kv: Product2[K, V] = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
- }
- while (iter.hasNext) {
- kv = iter.next()
- combiners.changeValue(kv._1, update)
+ if (!externalSorting) {
+ val combiners = new AppendOnlyMap[K,C]
+ var kv: Product2[K, V] = null
+ val update = (hadValue: Boolean, oldValue: C) => {
+ if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
+ }
+ while (iter.hasNext) {
+ kv = iter.next()
+ combiners.changeValue(kv._1, update)
+ }
+ combiners.iterator
+ } else {
+ val combiners =
+ new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ while (iter.hasNext) {
+ val (k, v) = iter.next()
+ combiners.insert(k, v)
+ }
+ combiners.iterator
}
- combiners.iterator
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
- val combiners = new AppendOnlyMap[K, C]
- var kc: (K, C) = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
+ if (!externalSorting) {
+ val combiners = new AppendOnlyMap[K,C]
+ var kc: Product2[K, C] = null
+ val update = (hadValue: Boolean, oldValue: C) => {
+ if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
+ }
+ while (iter.hasNext) {
+ kc = iter.next()
+ combiners.changeValue(kc._1, update)
+ }
+ combiners.iterator
+ } else {
+ val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
+ while (iter.hasNext) {
+ val (k, c) = iter.next()
+ combiners.insert(k, c)
+ }
+ combiners.iterator
}
- while (iter.hasNext) {
- kc = iter.next()
- combiners.changeValue(kc._1, update)
- }
- combiners.iterator
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 139048d5c7..9a3d36b51e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -345,29 +345,42 @@ class SparkContext(
}
/**
- * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any
- * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
- * etc).
+ * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
+ * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
+ * using the older MapReduce API (`org.apache.hadoop.mapred`).
+ *
+ * @param conf JobConf for setting up the dataset
+ * @param inputFormatClass Class of the [[InputFormat]]
+ * @param keyClass Class of the keys
+ * @param valueClass Class of the values
+ * @param minSplits Minimum number of Hadoop Splits to generate.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
- def hadoopRDD[K, V](
+ def hadoopRDD[K: ClassTag, V: ClassTag](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minSplits: Int = defaultMinSplits,
+ cloneRecords: Boolean = true
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V](
+ def hadoopFile[K: ClassTag, V: ClassTag](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minSplits: Int = defaultMinSplits,
+ cloneRecords: Boolean = true
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -379,7 +392,8 @@ class SparkContext(
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minSplits,
+ cloneRecords)
}
/**
@@ -390,14 +404,15 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* }}}
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
- (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
- : RDD[(K, V)] = {
+ def hadoopFile[K, V, F <: InputFormat[K, V]]
+ (path: String, minSplits: Int, cloneRecords: Boolean = true)
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
- fm.runtimeClass.asInstanceOf[Class[F]],
- km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]],
- minSplits)
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ minSplits,
+ cloneRecords)
}
/**
@@ -408,61 +423,67 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* }}}
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
+ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
- hadoopFile[K, V, F](path, defaultMinSplits)
+ hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
+ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
+ (path: String, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
- path,
- fm.runtimeClass.asInstanceOf[Class[F]],
- km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]])
+ path,
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ cloneRecords = cloneRecords)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
+ def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
- conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
+ conf: Configuration = hadoopConfiguration,
+ cloneRecords: Boolean = true): RDD[(K, V)] = {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
- new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
+ new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
- def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+ def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
- vClass: Class[V]): RDD[(K, V)] = {
- new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+ vClass: Class[V],
+ cloneRecords: Boolean = true): RDD[(K, V)] = {
+ new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String,
+ def sequenceFile[K: ClassTag, V: ClassTag](path: String,
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minSplits: Int,
+ cloneRecords: Boolean = true
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
+ hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
- sequenceFile(path, keyClass, valueClass, defaultMinSplits)
+ def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
+ cloneRecords: Boolean = true): RDD[(K, V)] =
+ sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -480,17 +501,18 @@ class SparkContext(
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
- def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
- (implicit km: ClassTag[K], vm: ClassTag[V],
- kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
+ def sequenceFile[K, V]
+ (path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
+ (implicit km: ClassTag[K], vm: ClassTag[V],
+ kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
val vc = vcf()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
- writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
+ vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
+ writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index e093e2f162..08b592df71 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -54,7 +54,11 @@ class SparkEnv private[spark] (
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
- val conf: SparkConf) {
+ val conf: SparkConf) extends Logging {
+
+ // A mapping of thread ID to amount of memory used for shuffle in bytes
+ // All accesses should be manually synchronized
+ val shuffleMemoryMap = mutable.HashMap[Long, Long]()
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e51d274d33..a7b2328a02 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -279,6 +279,11 @@ private[spark] class Executor(
//System.exit(1)
}
} finally {
+ // TODO: Unregister shuffle memory only for ShuffleMapTask
+ val shuffleMemoryMap = env.shuffleMemoryMap
+ shuffleMemoryMap.synchronized {
+ shuffleMemoryMap.remove(Thread.currentThread().getId)
+ }
runningTasks.remove(taskId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 4ba4696fef..a73714abca 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -23,8 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
-import org.apache.spark.util.AppendOnlyMap
-
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -44,14 +43,12 @@ private[spark] case class NarrowCoGroupSplitDep(
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-private[spark]
-class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
+private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
extends Partition with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
}
-
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
@@ -62,6 +59,14 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+ // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
+ // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
+ // CoGroupValue is the intermediate state of each value before being merged in compute.
+ private type CoGroup = ArrayBuffer[Any]
+ private type CoGroupValue = (Any, Int) // Int is dependency number
+ private type CoGroupCombiner = Seq[CoGroup]
+
+ private val sparkConf = SparkEnv.get.conf
private var serializerClass: String = null
def setSerializer(cls: String): CoGroupedRDD[K] = {
@@ -100,37 +105,74 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
- override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
+ override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
+ val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
- // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
- val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
- val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
- if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
- }
-
- val getSeq = (k: K) => {
- map.changeValue(k, update)
- }
-
- val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
+ // A list of (rdd iterator, dependency number) pairs
+ val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
- rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
- getSeq(kv._1)(depNum) += kv._2
- }
+ val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]]
+ rddIterators += ((it, depNum))
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
- kv => getSeq(kv._1)(depNum) += kv._2
+ val ser = SparkEnv.get.serializerManager.get(serializerClass, sparkConf)
+ val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser)
+ rddIterators += ((it, depNum))
+ }
+ }
+
+ if (!externalSorting) {
+ val map = new AppendOnlyMap[K, CoGroupCombiner]
+ val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
+ if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
+ }
+ val getCombiner: K => CoGroupCombiner = key => {
+ map.changeValue(key, update)
+ }
+ rddIterators.foreach { case (it, depNum) =>
+ while (it.hasNext) {
+ val kv = it.next()
+ getCombiner(kv._1)(depNum) += kv._2
}
}
+ new InterruptibleIterator(context, map.iterator)
+ } else {
+ val map = createExternalMap(numRdds)
+ rddIterators.foreach { case (it, depNum) =>
+ while (it.hasNext) {
+ val kv = it.next()
+ map.insert(kv._1, new CoGroupValue(kv._2, depNum))
+ }
+ }
+ new InterruptibleIterator(context, map.iterator)
+ }
+ }
+
+ private def createExternalMap(numRdds: Int)
+ : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+
+ val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
+ val newCombiner = Array.fill(numRdds)(new CoGroup)
+ value match { case (v, depNum) => newCombiner(depNum) += v }
+ newCombiner
}
- new InterruptibleIterator(context, map.iterator)
+ val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
+ (combiner, value) => {
+ value match { case (v, depNum) => combiner(depNum) += v }
+ combiner
+ }
+ val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
+ (combiner1, combiner2) => {
+ combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
+ }
+ new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
+ createCombiner, mergeValue, mergeCombiners)
}
override def clearDependencies() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 53f77a38f5..902083c24f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,7 +19,10 @@ package org.apache.spark.rdd
import java.io.EOFException
-import org.apache.hadoop.mapred.FileInputFormat
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@@ -31,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
-import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.spark.util.Utils.cloneWritables
/**
@@ -42,14 +45,14 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
val inputSplit = new SerializableWritable[InputSplit](s)
- override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
override val index: Int = idx
}
/**
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
- * sources in HBase, or S3).
+ * sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
@@ -61,15 +64,21 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
-class HadoopRDD[K, V](
+class HadoopRDD[K: ClassTag, V: ClassTag](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int)
+ minSplits: Int,
+ cloneRecords: Boolean)
extends RDD[(K, V)](sc, Nil) with Logging {
def this(
@@ -78,7 +87,8 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int) = {
+ minSplits: Int,
+ cloneRecords: Boolean) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
@@ -87,7 +97,8 @@ class HadoopRDD[K, V](
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minSplits,
+ cloneRecords)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -158,10 +169,10 @@ class HadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
-
val key: K = reader.createKey()
+ val keyCloneFunc = cloneWritables[K](jobConf)
val value: V = reader.createValue()
-
+ val valueCloneFunc = cloneWritables[V](jobConf)
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -169,7 +180,11 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
- (key, value)
+ if (cloneRecords) {
+ (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
+ } else {
+ (key, value)
+ }
}
override def close() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 73d15b9082..992bd4aa0a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,11 +20,14 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
+import scala.reflect.ClassTag
+
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.util.Utils.cloneWritables
private[spark]
@@ -33,15 +36,31 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
val serializableHadoopSplit = new SerializableWritable(rawSplit)
- override def hashCode(): Int = (41 * (41 + rddId) + index)
+ override def hashCode(): Int = 41 * (41 + rddId) + index
}
-class NewHadoopRDD[K, V](
+/**
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
+ *
+ * @param sc The SparkContext to associate the RDD with.
+ * @param inputFormatClass Storage format of the data to be read.
+ * @param keyClass Class of the key associated with the inputFormatClass.
+ * @param valueClass Class of the value associated with the inputFormatClass.
+ * @param conf The Hadoop configuration.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
+ */
+class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration)
+ @transient conf: Configuration,
+ cloneRecords: Boolean)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
@@ -88,7 +107,8 @@ class NewHadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
-
+ val keyCloneFunc = cloneWritables[K](conf)
+ val valueCloneFunc = cloneWritables[V](conf)
var havePair = false
var finished = false
@@ -105,7 +125,13 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- (reader.getCurrentKey, reader.getCurrentValue)
+ val key = reader.getCurrentKey
+ val value = reader.getCurrentValue
+ if (cloneRecords) {
+ (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
+ } else {
+ (key, value)
+ }
}
private def close() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index c118ddfc01..1248409e35 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -99,8 +99,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
- // A sanity check to make sure mergeCombiners is not defined.
- assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
@@ -267,8 +265,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
// into a hash table, leading to more objects in the old gen.
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+ def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
@@ -339,7 +338,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* existing partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
- : RDD[(K, C)] = {
+ : RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 7156d855d8..301d784b35 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -17,12 +17,14 @@
package org.apache.spark.storage
+import java.util.UUID
+
/**
* Identifies a particular Block of data, usually associated with a single file.
* A Block can be uniquely identified by its filename, but each type of Block has a different
* set of keys which produce its unique name.
*
- * If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method.
+ * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
*/
private[spark] sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
@@ -55,7 +57,8 @@ private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId {
def name = "broadcast_" + broadcastId
}
-private[spark] case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId {
+private[spark]
+case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId {
def name = broadcastId.name + "_" + hType
}
@@ -67,6 +70,11 @@ private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends B
def name = "input-" + streamId + "-" + uniqueId
}
+/** Id associated with temporary data managed as blocks. Not serializable. */
+private[spark] case class TempBlockId(id: UUID) extends BlockId {
+ def name = "temp_" + id
+}
+
// Intended only for testing purposes
private[spark] case class TestBlockId(id: String) extends BlockId {
def name = "test_" + id
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c56e2ca2df..ff9f241fc1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -159,7 +159,7 @@ private[spark] class BlockManager(
/**
* Reregister with the master and report all blocks to it. This will be called by the heart beat
- * thread if our heartbeat to the block amnager indicates that we were not registered.
+ * thread if our heartbeat to the block manager indicates that we were not registered.
*
* Note that this method must be called without any BlockInfo locks held.
*/
@@ -864,7 +864,7 @@ private[spark] object BlockManager extends Logging {
val ID_GENERATOR = new IdGenerator
def getMaxMemory(conf: SparkConf): Long = {
- val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66)
+ val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 61e63c60d5..369a277232 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -181,4 +181,8 @@ class DiskBlockObjectWriter(
// Only valid if called after close()
override def timeWriting() = _timeWriting
+
+ def bytesWritten: Long = {
+ lastValidPosition - initialPosition
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index edc1133172..a8ef7fa8b6 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
import java.io.File
import java.text.SimpleDateFormat
-import java.util.{Date, Random}
+import java.util.{Date, Random, UUID}
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
@@ -90,6 +90,15 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
def getFile(blockId: BlockId): File = getFile(blockId.name)
+ /** Produces a unique block id and File suitable for intermediate results. */
+ def createTempBlock(): (TempBlockId, File) = {
+ var blockId = new TempBlockId(UUID.randomUUID())
+ while (getFile(blockId).exists()) {
+ blockId = new TempBlockId(UUID.randomUUID())
+ }
+ (blockId, getFile(blockId))
+ }
+
private def createLocalDirs(): Array[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8dcfeacb60..d1e58016be 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -171,7 +171,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
- <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
+ <h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 463d85dfd5..9ad6de3c6d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
- <th>Task Time</th>
+ <th>Duration</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5f1253100b..23b72701c2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,23 +26,47 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+import org.apache.hadoop.io._
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
-import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
+import org.apache.spark.{SparkConf, SparkException, Logging}
/**
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging {
+
+ /**
+ * We try to clone for most common types of writables and we call WritableUtils.clone otherwise
+ * intention is to optimize, for example for NullWritable there is no need and for Long, int and
+ * String creating a new object with value set would be faster.
+ */
+ def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
+ val cloneFunc = classTag[T] match {
+ case ClassTag(_: Text) =>
+ (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
+ case ClassTag(_: LongWritable) =>
+ (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
+ case ClassTag(_: IntWritable) =>
+ (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
+ case ClassTag(_: NullWritable) =>
+ (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
+ case _ =>
+ (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
+ }
+ cloneFunc
+ }
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index fe710c58ac..62fd6d8da5 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import scala.util.Random
+
class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length
@@ -124,6 +126,12 @@ object Vector {
def ones(length: Int) = Vector(length, _ => 1)
+ /**
+ * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
+ * between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
+ */
+ def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble())
+
class Multiplier(num: Double) {
def * (vec: Vector) = vec * num
}
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 8bb4ee3bfa..d98c7aa3d7 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.util.collection
+
+import java.util.{Arrays, Comparator}
/**
* A simple open hash table optimized for the append-only use case, where keys
@@ -28,14 +30,15 @@ package org.apache.spark.util
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
private[spark]
-class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable {
+class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
+ V)] with Serializable {
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
require(initialCapacity >= 1, "Invalid initial capacity")
private var capacity = nextPowerOf2(initialCapacity)
private var mask = capacity - 1
private var curSize = 0
- private var growThreshold = LOAD_FACTOR * capacity
+ private var growThreshold = (LOAD_FACTOR * capacity).toInt
// Holds keys and values in the same array for memory locality; specifically, the order of
// elements is key0, value0, key1, value1, key2, value2, etc.
@@ -45,10 +48,15 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
private var haveNullValue = false
private var nullValue: V = null.asInstanceOf[V]
+ // Triggered by destructiveSortedIterator; the underlying data array may no longer be used
+ private var destroyed = false
+ private val destructionMessage = "Map state is invalid from destructive sorting!"
+
private val LOAD_FACTOR = 0.7
/** Get the value for a given key */
def apply(key: K): V = {
+ assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
return nullValue
@@ -72,6 +80,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Set the value for a key */
def update(key: K, value: V): Unit = {
+ assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@@ -106,6 +115,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
* for key, if any, or null otherwise. Returns the newly updated value.
*/
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
+ assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@@ -139,35 +149,38 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
/** Iterator method from Iterable */
- override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] {
- var pos = -1
-
- /** Get the next value we should return from next(), or null if we're finished iterating */
- def nextValue(): (K, V) = {
- if (pos == -1) { // Treat position -1 as looking at the null value
- if (haveNullValue) {
- return (null.asInstanceOf[K], nullValue)
+ override def iterator: Iterator[(K, V)] = {
+ assert(!destroyed, destructionMessage)
+ new Iterator[(K, V)] {
+ var pos = -1
+
+ /** Get the next value we should return from next(), or null if we're finished iterating */
+ def nextValue(): (K, V) = {
+ if (pos == -1) { // Treat position -1 as looking at the null value
+ if (haveNullValue) {
+ return (null.asInstanceOf[K], nullValue)
+ }
+ pos += 1
}
- pos += 1
- }
- while (pos < capacity) {
- if (!data(2 * pos).eq(null)) {
- return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
+ while (pos < capacity) {
+ if (!data(2 * pos).eq(null)) {
+ return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
+ }
+ pos += 1
}
- pos += 1
+ null
}
- null
- }
- override def hasNext: Boolean = nextValue() != null
+ override def hasNext: Boolean = nextValue() != null
- override def next(): (K, V) = {
- val value = nextValue()
- if (value == null) {
- throw new NoSuchElementException("End of iterator")
+ override def next(): (K, V) = {
+ val value = nextValue()
+ if (value == null) {
+ throw new NoSuchElementException("End of iterator")
+ }
+ pos += 1
+ value
}
- pos += 1
- value
}
}
@@ -190,7 +203,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
/** Double the table's size and re-hash everything */
- private def growTable() {
+ protected def growTable() {
val newCapacity = capacity * 2
if (newCapacity >= (1 << 30)) {
// We can't make the table this big because we want an array of 2x
@@ -227,11 +240,58 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
data = newData
capacity = newCapacity
mask = newMask
- growThreshold = LOAD_FACTOR * newCapacity
+ growThreshold = (LOAD_FACTOR * newCapacity).toInt
}
private def nextPowerOf2(n: Int): Int = {
val highBit = Integer.highestOneBit(n)
if (highBit == n) n else highBit << 1
}
+
+ /**
+ * Return an iterator of the map in sorted order. This provides a way to sort the map without
+ * using additional memory, at the expense of destroying the validity of the map.
+ */
+ def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
+ destroyed = true
+ // Pack KV pairs into the front of the underlying array
+ var keyIndex, newIndex = 0
+ while (keyIndex < capacity) {
+ if (data(2 * keyIndex) != null) {
+ data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
+ newIndex += 1
+ }
+ keyIndex += 1
+ }
+ assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
+
+ // Sort by the given ordering
+ val rawOrdering = new Comparator[AnyRef] {
+ def compare(x: AnyRef, y: AnyRef): Int = {
+ cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
+ }
+ }
+ Arrays.sort(data, 0, newIndex, rawOrdering)
+
+ new Iterator[(K, V)] {
+ var i = 0
+ var nullValueReady = haveNullValue
+ def hasNext: Boolean = (i < newIndex || nullValueReady)
+ def next(): (K, V) = {
+ if (nullValueReady) {
+ nullValueReady = false
+ (null.asInstanceOf[K], nullValue)
+ } else {
+ val item = data(i).asInstanceOf[(K, V)]
+ i += 1
+ item
+ }
+ }
+ }
+ }
+
+ /**
+ * Return whether the next insert will cause the map to grow
+ */
+ def atGrowThreshold: Boolean = curSize == growThreshold
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
new file mode 100644
index 0000000000..e3bcd895aa
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
+
+/**
+ * An append-only map that spills sorted content to disk when there is insufficient space for it
+ * to grow.
+ *
+ * This map takes two passes over the data:
+ *
+ * (1) Values are merged into combiners, which are sorted and spilled to disk as necessary
+ * (2) Combiners are read from disk and merged together
+ *
+ * The setting of the spill threshold faces the following trade-off: If the spill threshold is
+ * too high, the in-memory map may occupy more memory than is available, resulting in OOM.
+ * However, if the spill threshold is too low, we spill frequently and incur unnecessary disk
+ * writes. This may lead to a performance regression compared to the normal case of using the
+ * non-spilling AppendOnlyMap.
+ *
+ * Two parameters control the memory threshold:
+ *
+ * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing
+ * these maps as a fraction of the executor's total memory. Since each concurrently running
+ * task maintains one map, the actual threshold for each map is this quantity divided by the
+ * number of running tasks.
+ *
+ * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
+ * this threshold, in case map size estimation is not sufficiently accurate.
+ */
+
+private[spark] class ExternalAppendOnlyMap[K, V, C](
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C,
+ serializer: Serializer = SparkEnv.get.serializerManager.default,
+ diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager)
+ extends Iterable[(K, C)] with Serializable with Logging {
+
+ import ExternalAppendOnlyMap._
+
+ private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
+ private val spilledMaps = new ArrayBuffer[DiskMapIterator]
+ private val sparkConf = SparkEnv.get.conf
+
+ // Collective memory threshold shared across all running tasks
+ private val maxMemoryThreshold = {
+ val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3)
+ val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
+ (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+ }
+
+ // Number of pairs in the in-memory map
+ private var numPairsInMemory = 0
+
+ // Number of in-memory pairs inserted before tracking the map's shuffle memory usage
+ private val trackMemoryThreshold = 1000
+
+ // How many times we have spilled so far
+ private var spillCount = 0
+
+ private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
+ private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
+ private val comparator = new KCComparator[K, C]
+ private val ser = serializer.newInstance()
+
+ /**
+ * Insert the given key and value into the map.
+ *
+ * If the underlying map is about to grow, check if the global pool of shuffle memory has
+ * enough room for this to happen. If so, allocate the memory required to grow the map;
+ * otherwise, spill the in-memory map to disk.
+ *
+ * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
+ */
+ def insert(key: K, value: V) {
+ val update: (Boolean, C) => C = (hadVal, oldVal) => {
+ if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+ }
+ if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
+ val mapSize = currentMap.estimateSize()
+ var shouldSpill = false
+ val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+
+ // Atomically check whether there is sufficient memory in the global pool for
+ // this map to grow and, if possible, allocate the required amount
+ shuffleMemoryMap.synchronized {
+ val threadId = Thread.currentThread().getId
+ val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
+ val availableMemory = maxMemoryThreshold -
+ (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
+
+ // Assume map growth factor is 2x
+ shouldSpill = availableMemory < mapSize * 2
+ if (!shouldSpill) {
+ shuffleMemoryMap(threadId) = mapSize * 2
+ }
+ }
+ // Do not synchronize spills
+ if (shouldSpill) {
+ spill(mapSize)
+ }
+ }
+ currentMap.changeValue(key, update)
+ numPairsInMemory += 1
+ }
+
+ /**
+ * Sort the existing contents of the in-memory map and spill them to a temporary file on disk
+ */
+ private def spill(mapSize: Long) {
+ spillCount += 1
+ logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
+ .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
+ val (blockId, file) = diskBlockManager.createTempBlock()
+ val writer =
+ new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)
+ try {
+ val it = currentMap.destructiveSortedIterator(comparator)
+ while (it.hasNext) {
+ val kv = it.next()
+ writer.write(kv)
+ }
+ writer.commit()
+ } finally {
+ // Partial failures cannot be tolerated; do not revert partial writes
+ writer.close()
+ }
+ currentMap = new SizeTrackingAppendOnlyMap[K, C]
+ spilledMaps.append(new DiskMapIterator(file))
+
+ // Reset the amount of shuffle memory used by this map in the global pool
+ val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+ shuffleMemoryMap.synchronized {
+ shuffleMemoryMap(Thread.currentThread().getId) = 0
+ }
+ numPairsInMemory = 0
+ }
+
+ /**
+ * Return an iterator that merges the in-memory map with the spilled maps.
+ * If no spill has occurred, simply return the in-memory map's iterator.
+ */
+ override def iterator: Iterator[(K, C)] = {
+ if (spilledMaps.isEmpty) {
+ currentMap.iterator
+ } else {
+ new ExternalIterator()
+ }
+ }
+
+ /**
+ * An iterator that sort-merges (K, C) pairs from the in-memory map and the spilled maps
+ */
+ private class ExternalIterator extends Iterator[(K, C)] {
+
+ // A fixed-size queue that maintains a buffer for each stream we are currently merging
+ val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
+
+ // Input streams are derived both from the in-memory map and spilled maps on disk
+ // The in-memory map is sorted in place, while the spilled maps are already in sorted order
+ val sortedMap = currentMap.destructiveSortedIterator(comparator)
+ val inputStreams = Seq(sortedMap) ++ spilledMaps
+
+ inputStreams.foreach { it =>
+ val kcPairs = getMorePairs(it)
+ mergeHeap.enqueue(StreamBuffer(it, kcPairs))
+ }
+
+ /**
+ * Fetch from the given iterator until a key of different hash is retrieved. In the
+ * event of key hash collisions, this ensures no pairs are hidden from being merged.
+ * Assume the given iterator is in sorted order.
+ */
+ def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
+ val kcPairs = new ArrayBuffer[(K, C)]
+ if (it.hasNext) {
+ var kc = it.next()
+ kcPairs += kc
+ val minHash = kc._1.hashCode()
+ while (it.hasNext && kc._1.hashCode() == minHash) {
+ kc = it.next()
+ kcPairs += kc
+ }
+ }
+ kcPairs
+ }
+
+ /**
+ * If the given buffer contains a value for the given key, merge that value into
+ * baseCombiner and remove the corresponding (K, C) pair from the buffer
+ */
+ def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
+ var i = 0
+ while (i < buffer.pairs.size) {
+ val (k, c) = buffer.pairs(i)
+ if (k == key) {
+ buffer.pairs.remove(i)
+ return mergeCombiners(baseCombiner, c)
+ }
+ i += 1
+ }
+ baseCombiner
+ }
+
+ /**
+ * Return true if there exists an input stream that still has unvisited pairs
+ */
+ override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty)
+
+ /**
+ * Select a key with the minimum hash, then combine all values with the same key from all input streams.
+ */
+ override def next(): (K, C) = {
+ // Select a key from the StreamBuffer that holds the lowest key hash
+ val minBuffer = mergeHeap.dequeue()
+ val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
+ if (minPairs.length == 0) {
+ // Should only happen when no other stream buffers have any pairs left
+ throw new NoSuchElementException
+ }
+ var (minKey, minCombiner) = minPairs.remove(0)
+ assert(minKey.hashCode() == minHash)
+
+ // For all other streams that may have this key (i.e. have the same minimum key hash),
+ // merge in the corresponding value (if any) from that stream
+ val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
+ while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) {
+ val newBuffer = mergeHeap.dequeue()
+ minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)
+ mergedBuffers += newBuffer
+ }
+
+ // Repopulate each visited stream buffer and add it back to the merge heap
+ mergedBuffers.foreach { buffer =>
+ if (buffer.pairs.length == 0) {
+ buffer.pairs ++= getMorePairs(buffer.iterator)
+ }
+ mergeHeap.enqueue(buffer)
+ }
+
+ (minKey, minCombiner)
+ }
+
+ /**
+ * A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash.
+ * Each buffer maintains the lowest-ordered keys in the corresponding iterator. Due to
+ * hash collisions, it is possible for multiple keys to be "tied" for being the lowest.
+ *
+ * StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
+ */
+ case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
+ extends Comparable[StreamBuffer] {
+
+ def minKeyHash: Int = {
+ if (pairs.length > 0){
+ // pairs are already sorted by key hash
+ pairs(0)._1.hashCode()
+ } else {
+ Int.MaxValue
+ }
+ }
+
+ override def compareTo(other: StreamBuffer): Int = {
+ // minus sign because mutable.PriorityQueue dequeues the max, not the min
+ -minKeyHash.compareTo(other.minKeyHash)
+ }
+ }
+ }
+
+ /**
+ * An iterator that returns (K, C) pairs in sorted order from an on-disk map
+ */
+ private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
+ val fileStream = new FileInputStream(file)
+ val bufferedStream = new FastBufferedInputStream(fileStream)
+ val deserializeStream = ser.deserializeStream(bufferedStream)
+ var nextItem: (K, C) = null
+ var eof = false
+
+ def readNextItem(): (K, C) = {
+ if (!eof) {
+ try {
+ return deserializeStream.readObject().asInstanceOf[(K, C)]
+ } catch {
+ case e: EOFException =>
+ eof = true
+ cleanup()
+ }
+ }
+ null
+ }
+
+ override def hasNext: Boolean = {
+ if (nextItem == null) {
+ nextItem = readNextItem()
+ }
+ nextItem != null
+ }
+
+ override def next(): (K, C) = {
+ val item = if (nextItem == null) readNextItem() else nextItem
+ if (item == null) {
+ throw new NoSuchElementException
+ }
+ nextItem = null
+ item
+ }
+
+ // TODO: Ensure this gets called even if the iterator isn't drained.
+ def cleanup() {
+ deserializeStream.close()
+ file.delete()
+ }
+ }
+}
+
+private[spark] object ExternalAppendOnlyMap {
+ private class KCComparator[K, C] extends Comparator[(K, C)] {
+ def compare(kc1: (K, C), kc2: (K, C)): Int = {
+ kc1._1.hashCode().compareTo(kc2._1.hashCode())
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
new file mode 100644
index 0000000000..204330dad4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.util.SizeEstimator
+import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
+
+/**
+ * Append-only map that keeps track of its estimated size in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
+ * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
+ */
+private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
+
+ /**
+ * Controls the base of the exponential which governs the rate of sampling.
+ * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
+ */
+ private val SAMPLE_GROWTH_RATE = 1.1
+
+ /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
+ private val samples = new ArrayBuffer[Sample]()
+
+ /** Total number of insertions and updates into the map since the last resetSamples(). */
+ private var numUpdates: Long = _
+
+ /** The value of 'numUpdates' at which we will take our next sample. */
+ private var nextSampleNum: Long = _
+
+ /** The average number of bytes per update between our last two samples. */
+ private var bytesPerUpdate: Double = _
+
+ resetSamples()
+
+ /** Called after the map grows in size, as this can be a dramatic change for small objects. */
+ def resetSamples() {
+ numUpdates = 1
+ nextSampleNum = 1
+ samples.clear()
+ takeSample()
+ }
+
+ override def update(key: K, value: V): Unit = {
+ super.update(key, value)
+ numUpdates += 1
+ if (nextSampleNum == numUpdates) { takeSample() }
+ }
+
+ override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
+ val newValue = super.changeValue(key, updateFunc)
+ numUpdates += 1
+ if (nextSampleNum == numUpdates) { takeSample() }
+ newValue
+ }
+
+ /** Takes a new sample of the current map's size. */
+ def takeSample() {
+ samples += Sample(SizeEstimator.estimate(this), numUpdates)
+ // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
+ bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
+ case latest :: previous :: tail =>
+ (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
+ case _ =>
+ 0
+ })
+ nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
+ }
+
+ override protected def growTable() {
+ super.growTable()
+ resetSamples()
+ }
+
+ /** Estimates the current size of the map in bytes. O(1) time. */
+ def estimateSize(): Long = {
+ assert(samples.nonEmpty)
+ val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
+ (samples.last.size + extrapolatedDelta).toLong
+ }
+}
+
+private object SizeTrackingAppendOnlyMap {
+ case class Sample(size: Long, numUpdates: Long)
+}
diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
new file mode 100644
index 0000000000..93f0c6a8e6
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.util.Random
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
+import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
+
+class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
+ val NORMAL_ERROR = 0.20
+ val HIGH_ERROR = 0.30
+
+ test("fixed size insertions") {
+ testWith[Int, Long](10000, i => (i, i.toLong))
+ testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
+ testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
+ }
+
+ test("variable size insertions") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testWith[Int, String](10000, i => (i, randString(0, 10)))
+ testWith[Int, String](10000, i => (i, randString(0, 100)))
+ testWith[Int, String](10000, i => (i, randString(90, 100)))
+ }
+
+ test("updates") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testWith[String, Int](10000, i => (randString(0, 10000), i))
+ }
+
+ def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
+ val map = new SizeTrackingAppendOnlyMap[K, V]()
+ for (i <- 0 until numElements) {
+ val (k, v) = makeElement(i)
+ map(k) = v
+ expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+ }
+ }
+
+ def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
+ val betterEstimatedSize = SizeEstimator.estimate(obj)
+ assert(betterEstimatedSize * (1 - error) < estimatedSize,
+ s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
+ assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
+ s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
+ }
+}
+
+object SizeTrackingAppendOnlyMapSuite {
+ // Speed test, for reproducibility of results.
+ // These could be highly non-deterministic in general, however.
+ // Results:
+ // AppendOnlyMap: 31 ms
+ // SizeTracker: 54 ms
+ // SizeEstimator: 1500 ms
+ def main(args: Array[String]) {
+ val numElements = 100000
+
+ val baseTimes = for (i <- 0 until 10) yield time {
+ val map = new AppendOnlyMap[Int, LargeDummyClass]()
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass()
+ }
+ }
+
+ val sampledTimes = for (i <- 0 until 10) yield time {
+ val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass()
+ map.estimateSize()
+ }
+ }
+
+ val unsampledTimes = for (i <- 0 until 3) yield time {
+ val map = new AppendOnlyMap[Int, LargeDummyClass]()
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass()
+ SizeEstimator.estimate(map)
+ }
+ }
+
+ println("Base: " + baseTimes)
+ println("SizeTracker (sampled): " + sampledTimes)
+ println("SizeEstimator (unsampled): " + unsampledTimes)
+ }
+
+ def time(f: => Unit): Long = {
+ val start = System.currentTimeMillis()
+ f
+ System.currentTimeMillis() - start
+ }
+
+ private class LargeDummyClass {
+ val arr = new Array[Int](100)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
new file mode 100644
index 0000000000..7006571ef0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+/**
+ * Tests org.apache.spark.util.Vector functionality
+ */
+class VectorSuite extends FunSuite {
+
+ def verifyVector(vector: Vector, expectedLength: Int) = {
+ assert(vector.length == expectedLength)
+ assert(vector.elements.min > 0.0)
+ assert(vector.elements.max < 1.0)
+ }
+
+ test("random with default random number generator") {
+ val vector100 = Vector.random(100)
+ verifyVector(vector100, 100)
+ }
+
+ test("random with given random number generator") {
+ val vector100 = Vector.random(100, new Random(100))
+ verifyVector(vector100, 100)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index 7177919a58..f44442f1a5 100644
--- a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
+import java.util.Comparator
class AppendOnlyMapSuite extends FunSuite {
test("initialization") {
@@ -151,4 +152,47 @@ class AppendOnlyMapSuite extends FunSuite {
assert(map("" + i) === "" + i)
}
}
+
+ test("destructive sort") {
+ val map = new AppendOnlyMap[String, String]()
+ for (i <- 1 to 100) {
+ map("" + i) = "" + i
+ }
+ map.update(null, "happy new year!")
+
+ try {
+ map.apply("1")
+ map.update("1", "2013")
+ map.changeValue("1", (hadValue, oldValue) => "2014")
+ map.iterator
+ } catch {
+ case e: IllegalStateException => fail()
+ }
+
+ val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
+ def compare(kv1: (String, String), kv2: (String, String)): Int = {
+ val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
+ val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
+ x.compareTo(y)
+ }
+ })
+
+ // Should be sorted by key
+ assert(it.hasNext)
+ var previous = it.next()
+ assert(previous == (null, "happy new year!"))
+ previous = it.next()
+ assert(previous == ("1", "2014"))
+ while (it.hasNext) {
+ val kv = it.next()
+ assert(kv._1.toInt > previous._1.toInt)
+ previous = kv
+ }
+
+ // All subsequent calls to apply, update, changeValue and iterator should throw exception
+ intercept[AssertionError] { map.apply("1") }
+ intercept[AssertionError] { map.update("1", "2013") }
+ intercept[AssertionError] { map.changeValue("1", (hadValue, oldValue) => "2014") }
+ intercept[AssertionError] { map.iterator }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
new file mode 100644
index 0000000000..ef957bb0e5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -0,0 +1,230 @@
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
+
+ override def beforeEach() {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.externalSorting", "true")
+ sc = new SparkContext("local", "test", conf)
+ }
+
+ val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)
+ val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => {
+ buffer += i
+ }
+ val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] =
+ (buf1, buf2) => {
+ buf1 ++= buf2
+ }
+
+ test("simple insert") {
+ val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+
+ // Single insert
+ map.insert(1, 10)
+ var it = map.iterator
+ assert(it.hasNext)
+ val kv = it.next()
+ assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10))
+ assert(!it.hasNext)
+
+ // Multiple insert
+ map.insert(2, 20)
+ map.insert(3, 30)
+ it = map.iterator
+ assert(it.hasNext)
+ assert(it.toSet == Set[(Int, ArrayBuffer[Int])](
+ (1, ArrayBuffer[Int](10)),
+ (2, ArrayBuffer[Int](20)),
+ (3, ArrayBuffer[Int](30))))
+ }
+
+ test("insert with collision") {
+ val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+
+ map.insert(1, 10)
+ map.insert(2, 20)
+ map.insert(3, 30)
+ map.insert(1, 100)
+ map.insert(2, 200)
+ map.insert(1, 1000)
+ val it = map.iterator
+ assert(it.hasNext)
+ val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
+ assert(result == Set[(Int, Set[Int])](
+ (1, Set[Int](10, 100, 1000)),
+ (2, Set[Int](20, 200)),
+ (3, Set[Int](30))))
+ }
+
+ test("ordering") {
+ val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+ map1.insert(1, 10)
+ map1.insert(2, 20)
+ map1.insert(3, 30)
+
+ val map2 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+ map2.insert(2, 20)
+ map2.insert(3, 30)
+ map2.insert(1, 10)
+
+ val map3 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+ map3.insert(3, 30)
+ map3.insert(1, 10)
+ map3.insert(2, 20)
+
+ val it1 = map1.iterator
+ val it2 = map2.iterator
+ val it3 = map3.iterator
+
+ var kv1 = it1.next()
+ var kv2 = it2.next()
+ var kv3 = it3.next()
+ assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
+ assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+
+ kv1 = it1.next()
+ kv2 = it2.next()
+ kv3 = it3.next()
+ assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
+ assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+
+ kv1 = it1.next()
+ kv2 = it2.next()
+ kv3 = it3.next()
+ assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
+ assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+ }
+
+ test("null keys and values") {
+ val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+ map.insert(1, 5)
+ map.insert(2, 6)
+ map.insert(3, 7)
+ assert(map.size === 3)
+ assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+ (1, Seq[Int](5)),
+ (2, Seq[Int](6)),
+ (3, Seq[Int](7))
+ ))
+
+ // Null keys
+ val nullInt = null.asInstanceOf[Int]
+ map.insert(nullInt, 8)
+ assert(map.size === 4)
+ assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+ (1, Seq[Int](5)),
+ (2, Seq[Int](6)),
+ (3, Seq[Int](7)),
+ (nullInt, Seq[Int](8))
+ ))
+
+ // Null values
+ map.insert(4, nullInt)
+ map.insert(nullInt, nullInt)
+ assert(map.size === 5)
+ val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
+ assert(result == Set[(Int, Set[Int])](
+ (1, Set[Int](5)),
+ (2, Set[Int](6)),
+ (3, Set[Int](7)),
+ (4, Set[Int](nullInt)),
+ (nullInt, Set[Int](nullInt, 8))
+ ))
+ }
+
+ test("simple aggregator") {
+ // reduceByKey
+ val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
+ val result1 = rdd.reduceByKey(_+_).collect()
+ assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5)))
+
+ // groupByKey
+ val result2 = rdd.groupByKey().collect()
+ assert(result2.toSet == Set[(Int, Seq[Int])]
+ ((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1))))
+ }
+
+ test("simple cogroup") {
+ val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
+ val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
+ val result = rdd1.cogroup(rdd2).collect()
+
+ result.foreach { case (i, (seq1, seq2)) =>
+ i match {
+ case 0 => assert(seq1.toSet == Set[Int]() && seq2.toSet == Set[Int](2, 4))
+ case 1 => assert(seq1.toSet == Set[Int](1) && seq2.toSet == Set[Int](1, 3))
+ case 2 => assert(seq1.toSet == Set[Int](2) && seq2.toSet == Set[Int]())
+ case 3 => assert(seq1.toSet == Set[Int](3) && seq2.toSet == Set[Int]())
+ case 4 => assert(seq1.toSet == Set[Int](4) && seq2.toSet == Set[Int]())
+ }
+ }
+ }
+
+ test("spilling") {
+ // TODO: Figure out correct memory parameters to actually induce spilling
+ // System.setProperty("spark.shuffle.buffer.mb", "1")
+ // System.setProperty("spark.shuffle.buffer.fraction", "0.05")
+
+ // reduceByKey - should spill exactly 6 times
+ val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i))
+ val resultA = rddA.reduceByKey(math.max(_, _)).collect()
+ assert(resultA.length == 5000)
+ resultA.foreach { case(k, v) =>
+ k match {
+ case 0 => assert(v == 1)
+ case 2500 => assert(v == 5001)
+ case 4999 => assert(v == 9999)
+ case _ =>
+ }
+ }
+
+ // groupByKey - should spill exactly 11 times
+ val rddB = sc.parallelize(0 until 10000).map(i => (i/4, i))
+ val resultB = rddB.groupByKey().collect()
+ assert(resultB.length == 2500)
+ resultB.foreach { case(i, seq) =>
+ i match {
+ case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3))
+ case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003))
+ case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999))
+ case _ =>
+ }
+ }
+
+ // cogroup - should spill exactly 7 times
+ val rddC1 = sc.parallelize(0 until 1000).map(i => (i, i))
+ val rddC2 = sc.parallelize(0 until 1000).map(i => (i%100, i))
+ val resultC = rddC1.cogroup(rddC2).collect()
+ assert(resultC.length == 1000)
+ resultC.foreach { case(i, (seq1, seq2)) =>
+ i match {
+ case 0 =>
+ assert(seq1.toSet == Set[Int](0))
+ assert(seq2.toSet == Set[Int](0, 100, 200, 300, 400, 500, 600, 700, 800, 900))
+ case 500 =>
+ assert(seq1.toSet == Set[Int](500))
+ assert(seq2.toSet == Set[Int]())
+ case 999 =>
+ assert(seq1.toSet == Set[Int](999))
+ assert(seq2.toSet == Set[Int]())
+ case _ =>
+ }
+ }
+ }
+
+ // TODO: Test memory allocation for multiple concurrently running tasks
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index b1a0e19167..ad75e06fc7 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -104,14 +104,25 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.storage.memoryFraction</td>
- <td>0.66</td>
+ <td>0.6</td>
<td>
Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
- generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase
+ generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase
it if you configure your own old generation size.
</td>
</tr>
<tr>
+ <td>spark.shuffle.memoryFraction</td>
+ <td>0.3</td>
+ <td>
+ Fraction of Java heap to use for aggregation and cogroups during shuffles, if
+ <code>spark.shuffle.externalSorting</code> is enabled. At any given time, the collective size of
+ all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
+ begin to spill to disk. If spills are often, consider increasing this value at the expense of
+ <code>spark.storage.memoryFraction</code>.
+ </td>
+</tr>
+<tr>
<td>spark.mesos.coarse</td>
<td>false</td>
<td>
@@ -377,6 +388,14 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.shuffle.externalSorting</td>
+ <td>true</td>
+ <td>
+ If set to "true", limits the amount of memory used during reduces by spilling data out to disk. This spilling
+ threshold is specified by <code>spark.shuffle.memoryFraction</code>.
+ </td>
+</tr>
+<tr>
<td>spark.speculation</td>
<td>false</td>
<td>
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index d82a1e1490..e7cb5ab3ff 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -185,7 +185,11 @@ def get_spark_ami(opts):
"hi1.4xlarge": "hvm",
"m3.xlarge": "hvm",
"m3.2xlarge": "hvm",
- "cr1.8xlarge": "hvm"
+ "cr1.8xlarge": "hvm",
+ "i2.xlarge": "hvm",
+ "i2.2xlarge": "hvm",
+ "i2.4xlarge": "hvm",
+ "i2.8xlarge": "hvm"
}
if opts.instance_type in instance_types:
instance_type = instance_types[opts.instance_type]
@@ -478,7 +482,11 @@ def get_num_disks(instance_type):
"cr1.8xlarge": 2,
"hi1.4xlarge": 2,
"m3.xlarge": 0,
- "m3.2xlarge": 0
+ "m3.2xlarge": 0,
+ "i2.xlarge": 1,
+ "i2.2xlarge": 2,
+ "i2.4xlarge": 4,
+ "i2.8xlarge": 8
}
if instance_type in disks_by_instance:
return disks_by_instance[instance_type]
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index f782e0e126..23b2fead65 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -45,9 +45,9 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.sksamuel.kafka</groupId>
+ <groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.8.0-beta1</version>
+ <version>0.8.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index c8987a3ee0..41e813d48c 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -80,7 +80,7 @@ class MQTTReceiver(brokerUrl: String,
var peristance: MqttClientPersistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
+ var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
// Connect to MqttBroker
client.connect()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 8b27ecf82c..89ee07063d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -22,7 +22,7 @@ import scala.util.Random
import scala.util.Sorting
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext}
+import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoRegistrator
@@ -578,12 +578,13 @@ object ALS {
val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false
val alpha = if (args.length >= 8) args(7).toDouble else 1
val blocks = if (args.length == 9) args(8).toInt else -1
- val sc = new SparkContext(master, "ALS")
- sc.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- sc.conf.set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
- sc.conf.set("spark.kryo.referenceTracking", "false")
- sc.conf.set("spark.kryoserializer.buffer.mb", "8")
- sc.conf.set("spark.locality.wait", "10000")
+ val conf = new SparkConf()
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
+ .set("spark.kryo.referenceTracking", "false")
+ .set("spark.kryoserializer.buffer.mb", "8")
+ .set("spark.locality.wait", "10000")
+ val sc = new SparkContext(master, "ALS", conf)
val ratings = sc.textFile(ratingsFile).map { line =>
val fields = line.split(',')
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f794918d22..c8b5f09ab5 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -90,7 +90,7 @@ object SparkBuild extends Build {
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]()
- lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings)
+ lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings)
@@ -98,23 +98,23 @@ object SparkBuild extends Build {
lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings)
.dependsOn(streaming % "compile->compile;test->test")
-
+
lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings)
.dependsOn(streaming % "compile->compile;test->test")
-
+
lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
-
+
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
.dependsOn(core, mllib, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
// Everything except assembly, tools and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
+ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
@@ -321,7 +321,7 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
libraryDependencies ++= Seq(
- "commons-io" % "commons-io" % "2.4"
+ "commons-io" % "commons-io" % "2.4"
)
)
@@ -388,19 +388,19 @@ object SparkBuild extends Build {
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty)
)
)
-
+
def kafkaSettings() = sharedSettings ++ Seq(
name := "spark-streaming-kafka",
libraryDependencies ++= Seq(
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
- "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
+ "org.apache.kafka" %% "kafka" % "0.8.0"
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("net.sf.jopt-simple", "jopt-simple")
excludeAll(excludeNetty)
)
)
-
+
def flumeSettings() = sharedSettings ++ Seq(
name := "spark-streaming-flume",
libraryDependencies ++= Seq(
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 0d2145da9a..8b7d7709bf 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -28,6 +28,7 @@ import java.util.*;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
+import com.google.common.collect.Sets;
import org.apache.spark.SparkConf;
import org.apache.spark.HashPartitioner;
@@ -441,13 +442,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<String, String>("new york", "islanders")));
- List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
- Arrays.asList(
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Sets.newHashSet(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("dodgers", "giants")),
new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("yankees", "mets"))),
- Arrays.asList(
+ new Tuple2<String, String>("yankees", "mets"))),
+ Sets.newHashSet(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("sharks", "ducks")),
new Tuple2<String, Tuple2<String, String>>("new york",
@@ -482,8 +483,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
+ unorderedResult.add(Sets.newHashSet(res));
+ }
- Assert.assertEquals(expected, result);
+ Assert.assertEquals(expected, unorderedResult);
}
@@ -1196,15 +1201,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("hello", "moon"),
Arrays.asList("hello"));
- List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
+ List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
+ Sets.newHashSet(
new Tuple2<String, Long>("hello", 1L),
new Tuple2<String, Long>("world", 1L)),
- Arrays.asList(
+ Sets.newHashSet(
new Tuple2<String, Long>("hello", 2L),
new Tuple2<String, Long>("world", 1L),
new Tuple2<String, Long>("moon", 1L)),
- Arrays.asList(
+ Sets.newHashSet(
new Tuple2<String, Long>("hello", 2L),
new Tuple2<String, Long>("moon", 1L)));
@@ -1214,8 +1219,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList();
+ for (List<Tuple2<String, Long>> res: result) {
+ unorderedResult.add(Sets.newHashSet(res));
+ }
- Assert.assertEquals(expected, result);
+ Assert.assertEquals(expected, unorderedResult);
}
@Test