aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/pom.xml11
-rw-r--r--core/pom.xml13
-rw-r--r--core/src/main/scala/spark/Accumulators.scala38
-rw-r--r--core/src/main/scala/spark/BoundedMemoryCache.scala118
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala16
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala37
-rw-r--r--core/src/main/scala/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/spark/RDD.scala20
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala8
-rw-r--r--core/src/main/scala/spark/SizeEstimator.scala13
-rw-r--r--core/src/main/scala/spark/SparkContext.scala7
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala10
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala8
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala36
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala78
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala58
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerArguments.scala22
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala19
-rw-r--r--core/src/main/scala/spark/network/Connection.scala7
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala12
-rw-r--r--core/src/main/scala/spark/network/ConnectionManagerTest.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala6
-rw-r--r--core/src/test/scala/spark/BoundedMemoryCacheSuite.scala58
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java60
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala21
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala25
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala7
-rw-r--r--core/src/test/scala/spark/SizeEstimatorSuite.scala48
-rw-r--r--docs/quick-start.md10
-rw-r--r--examples/pom.xml11
-rw-r--r--examples/src/main/scala/spark/examples/LocalLR.scala2
-rw-r--r--pom.xml14
-rw-r--r--project/SparkBuild.scala4
-rw-r--r--repl-bin/pom.xml11
-rw-r--r--repl/pom.xml11
-rwxr-xr-xrun12
36 files changed, 575 insertions, 284 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml
index a8256a6e8b..5f58347204 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -45,6 +45,11 @@
<profiles>
<profile>
<id>hadoop1</id>
+ <activation>
+ <property>
+ <name>!hadoopVersion</name>
+ </property>
+ </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
@@ -72,6 +77,12 @@
</profile>
<profile>
<id>hadoop2</id>
+ <activation>
+ <property>
+ <name>hadoopVersion</name>
+ <value>2</value>
+ </property>
+ </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
diff --git a/core/pom.xml b/core/pom.xml
index ae52c20657..ad9fdcde2c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -159,6 +159,11 @@
<profiles>
<profile>
<id>hadoop1</id>
+ <activation>
+ <property>
+ <name>!hadoopVersion</name>
+ </property>
+ </activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -211,6 +216,12 @@
</profile>
<profile>
<id>hadoop2</id>
+ <activation>
+ <property>
+ <name>hadoopVersion</name>
+ <value>2</value>
+ </property>
+ </activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -267,4 +278,4 @@
</build>
</profile>
</profiles>
-</project> \ No newline at end of file
+</project>
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index bacd0ace37..b644aba5f8 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -39,19 +39,36 @@ class Accumulable[R, T] (
def += (term: T) { value_ = param.addAccumulator(value_, term) }
/**
+ * Add more data to this accumulator / accumulable
+ * @param term the data to add
+ */
+ def add(term: T) { value_ = param.addAccumulator(value_, term) }
+
+ /**
* Merge two accumulable objects together
- *
+ *
* Normally, a user will not want to use this version, but will instead call `+=`.
- * @param term the other Accumulable that will get merged with this
+ * @param term the other `R` that will get merged with this
*/
def ++= (term: R) { value_ = param.addInPlace(value_, term)}
/**
+ * Merge two accumulable objects together
+ *
+ * Normally, a user will not want to use this version, but will instead call `add`.
+ * @param term the other `R` that will get merged with this
+ */
+ def merge(term: R) { value_ = param.addInPlace(value_, term)}
+
+ /**
* Access the accumulator's current value; only allowed on master.
*/
- def value = {
- if (!deserialized) value_
- else throw new UnsupportedOperationException("Can't read accumulator value in task")
+ def value: R = {
+ if (!deserialized) {
+ value_
+ } else {
+ throw new UnsupportedOperationException("Can't read accumulator value in task")
+ }
}
/**
@@ -68,10 +85,17 @@ class Accumulable[R, T] (
/**
* Set the accumulator's value; only allowed on master.
*/
- def value_= (r: R) {
- if (!deserialized) value_ = r
+ def value_= (newValue: R) {
+ if (!deserialized) value_ = newValue
else throw new UnsupportedOperationException("Can't assign accumulator value in task")
}
+
+ /**
+ * Set the accumulator's value; only allowed on master
+ */
+ def setValue(newValue: R) {
+ this.value = newValue
+ }
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala
deleted file mode 100644
index e8392a194f..0000000000
--- a/core/src/main/scala/spark/BoundedMemoryCache.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-package spark
-
-import java.util.LinkedHashMap
-
-/**
- * An implementation of Cache that estimates the sizes of its entries and attempts to limit its
- * total memory usage to a fraction of the JVM heap. Objects' sizes are estimated using
- * SizeEstimator, which has limitations; most notably, we will overestimate total memory used if
- * some cache entries have pointers to a shared object. Nonetheless, this Cache should work well
- * when most of the space is used by arrays of primitives or of simple classes.
- */
-private[spark] class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
- logInfo("BoundedMemoryCache.maxBytes = " + maxBytes)
-
- def this() {
- this(BoundedMemoryCache.getMaxBytes)
- }
-
- private var currentBytes = 0L
- private val map = new LinkedHashMap[(Any, Int), Entry](32, 0.75f, true)
-
- override def get(datasetId: Any, partition: Int): Any = {
- synchronized {
- val entry = map.get((datasetId, partition))
- if (entry != null) {
- entry.value
- } else {
- null
- }
- }
- }
-
- override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
- val key = (datasetId, partition)
- logInfo("Asked to add key " + key)
- val size = estimateValueSize(key, value)
- synchronized {
- if (size > getCapacity) {
- return CachePutFailure()
- } else if (ensureFreeSpace(datasetId, size)) {
- logInfo("Adding key " + key)
- map.put(key, new Entry(value, size))
- currentBytes += size
- logInfo("Number of entries is now " + map.size)
- return CachePutSuccess(size)
- } else {
- logInfo("Didn't add key " + key + " because we would have evicted part of same dataset")
- return CachePutFailure()
- }
- }
- }
-
- override def getCapacity: Long = maxBytes
-
- /**
- * Estimate sizeOf 'value'
- */
- private def estimateValueSize(key: (Any, Int), value: Any) = {
- val startTime = System.currentTimeMillis
- val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef])
- val timeTaken = System.currentTimeMillis - startTime
- logInfo("Estimated size for key %s is %d".format(key, size))
- logInfo("Size estimation for key %s took %d ms".format(key, timeTaken))
- size
- }
-
- /**
- * Remove least recently used entries from the map until at least space bytes are free, in order
- * to make space for a partition from the given dataset ID. If this cannot be done without
- * evicting other data from the same dataset, returns false; otherwise, returns true. Assumes
- * that a lock is held on the BoundedMemoryCache.
- */
- private def ensureFreeSpace(datasetId: Any, space: Long): Boolean = {
- logInfo("ensureFreeSpace(%s, %d) called with curBytes=%d, maxBytes=%d".format(
- datasetId, space, currentBytes, maxBytes))
- val iter = map.entrySet.iterator // Will give entries in LRU order
- while (maxBytes - currentBytes < space && iter.hasNext) {
- val mapEntry = iter.next()
- val (entryDatasetId, entryPartition) = mapEntry.getKey
- if (entryDatasetId == datasetId) {
- // Cannot make space without removing part of the same dataset, or a more recently used one
- return false
- }
- reportEntryDropped(entryDatasetId, entryPartition, mapEntry.getValue)
- currentBytes -= mapEntry.getValue.size
- iter.remove()
- }
- return true
- }
-
- protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
- logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
- // TODO: remove BoundedMemoryCache
-
- val (keySpaceId, innerDatasetId) = datasetId.asInstanceOf[(Any, Any)]
- innerDatasetId match {
- case rddId: Int =>
- SparkEnv.get.cacheTracker.dropEntry(rddId, partition)
- case broadcastUUID: java.util.UUID =>
- // TODO: Maybe something should be done if the broadcasted variable falls out of cache
- case _ =>
- }
- }
-}
-
-// An entry in our map; stores a cached object and its size in bytes
-private[spark] case class Entry(value: Any, size: Long)
-
-private[spark] object BoundedMemoryCache {
- /**
- * Get maximum cache capacity from system configuration
- */
- def getMaxBytes: Long = {
- val memoryFractionToUse = System.getProperty("spark.boundedMemoryCache.memoryFraction", "0.66").toDouble
- (Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong
- }
-}
-
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 3d79078733..04c26b2e40 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -202,26 +202,20 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
loading.add(key)
}
}
- // If we got here, we have to load the split
- // Tell the master that we're doing so
- //val host = System.getProperty("spark.hostname", Utils.localHostName)
- //val future = trackerActor !! AddedToCache(rdd.id, split.index, host)
- // TODO: fetch any remote copy of the split that may be available
- // TODO: also register a listener for when it unloads
- logInfo("Computing partition " + split)
- val elements = new ArrayBuffer[Any]
- elements ++= rdd.compute(split, context)
try {
+ // If we got here, we have to load the split
+ val elements = new ArrayBuffer[Any]
+ logInfo("Computing partition " + split)
+ elements ++= rdd.compute(split, context)
// Try to put this block in the blockManager
blockManager.put(key, elements, storageLevel, true)
- //future.apply() // Wait for the reply from the cache tracker
+ return elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
- return elements.iterator.asInstanceOf[Iterator[T]]
}
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index d3e206b353..ce48cea903 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -52,6 +52,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): RDD[(K, C)] = {
+ if (getKeyClass().isArray) {
+ if (mapSideCombine) {
+ throw new SparkException("Cannot use map-side combining with array keys.")
+ }
+ if (partitioner.isInstanceOf[HashPartitioner]) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
+ }
+ }
val aggregator =
new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (mapSideCombine) {
@@ -92,6 +100,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
+
+ if (getKeyClass().isArray) {
+ throw new SparkException("reduceByKeyLocally() does not support array keys")
+ }
+
def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
val map = new JHashMap[K, V]
for ((k, v) <- iter) {
@@ -165,6 +178,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* be set to true.
*/
def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
+ if (getKeyClass().isArray) {
+ if (mapSideCombine) {
+ throw new SparkException("Cannot use map-side combining with array keys.")
+ }
+ if (partitioner.isInstanceOf[HashPartitioner]) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
+ }
+ }
if (mapSideCombine) {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
@@ -336,6 +357,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
+ if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
+ }
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
partitioner)
@@ -352,6 +376,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
+ }
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]],
other1.asInstanceOf[RDD[(_, _)]],
@@ -588,6 +615,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
writer.cleanup()
}
+ /**
+ * Return an RDD with the keys of each tuple.
+ */
+ def keys: RDD[K] = self.map(_._1)
+
+ /**
+ * Return an RDD with the values of each tuple.
+ */
+ def values: RDD[V] = self.map(_._2)
+
private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index b71021a082..9d5b966e1e 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -11,6 +11,10 @@ abstract class Partitioner extends Serializable {
/**
* A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
+ *
+ * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
+ * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
+ * produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index d15c6f7396..3b9ced1946 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -330,6 +330,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def toArray(): Array[T] = collect()
/**
+ * Return an RDD that contains all matching values by applying `f`.
+ */
+ def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
+ filter(f.isDefinedAt).map(f)
+ }
+
+ /**
* Reduces the elements of this RDD using the specified associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
@@ -417,6 +424,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): Map[T, Long] = {
+ if (elementClassManifest.erasure.isArray) {
+ throw new SparkException("countByValue() does not support arrays")
+ }
// TODO: This should perhaps be distributed by default.
def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
val map = new OLMap[T]
@@ -445,6 +455,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
timeout: Long,
confidence: Double = 0.95
): PartialResult[Map[T, BoundedDouble]] = {
+ if (elementClassManifest.erasure.isArray) {
+ throw new SparkException("countByValueApprox() does not support arrays")
+ }
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
val map = new OLMap[T]
while (iter.hasNext) {
@@ -504,6 +517,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
.saveAsSequenceFile(path)
}
+ /**
+ * Creates tuples of the elements in this RDD by applying `f`.
+ */
+ def keyBy[K](f: T => K): RDD[(K, T)] = {
+ map(x => (f(x), x))
+ }
+
/** A private method for tests, to look at the contents of each partition */
private[spark] def collectPartitions(): Array[Array[T]] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index a34aee69c1..6b4a11d6d3 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -42,7 +42,13 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
} else {
- implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ // We get the type of the Writable class by looking at the apply method which converts
+ // from T to Writable. Since we have two apply methods we filter out the one which
+ // is of the form "java.lang.Object apply(java.lang.Object)"
+ implicitly[T => Writable].getClass.getDeclaredMethods().filter(
+ m => m.getReturnType().toString != "java.lang.Object" &&
+ m.getName() == "apply")(0).getReturnType
+
}
// TODO: use something like WritableConverter to avoid reflection
}
diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala
index 7c3e8640e9..d4e1157250 100644
--- a/core/src/main/scala/spark/SizeEstimator.scala
+++ b/core/src/main/scala/spark/SizeEstimator.scala
@@ -9,7 +9,6 @@ import java.util.Random
import javax.management.MBeanServer
import java.lang.management.ManagementFactory
-import com.sun.management.HotSpotDiagnosticMXBean
import scala.collection.mutable.ArrayBuffer
@@ -76,12 +75,20 @@ private[spark] object SizeEstimator extends Logging {
if (System.getProperty("spark.test.useCompressedOops") != null) {
return System.getProperty("spark.test.useCompressedOops").toBoolean
}
+
try {
val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"
val server = ManagementFactory.getPlatformMBeanServer()
+
+ // NOTE: This should throw an exception in non-Sun JVMs
+ val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean")
+ val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
+ Class.forName("java.lang.String"))
+
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
- hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean])
- return bean.getVMOption("UseCompressedOops").getValue.toBoolean
+ hotSpotMBeanName, hotSpotMBeanClass)
+ // TODO: We could use reflection on the VMOption returned ?
+ return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
} catch {
case e: Exception => {
// Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 4fd81bc63b..bbf8272eb3 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -382,11 +382,12 @@ class SparkContext(
new Accumulator(initialValue, param)
/**
- * Create an [[spark.Accumulable]] shared variable, with a `+=` method
+ * Create an [[spark.Accumulable]] shared variable, to which tasks can add values with `+=`.
+ * Only the master can access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
- def accumulable[T,R](initialValue: T)(implicit param: AccumulableParam[T,R]) =
+ def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param)
/**
@@ -404,7 +405,7 @@ class SparkContext(
* Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
- def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T] (value, isLocal)
+ def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
/**
* Add a file to be downloaded into the working directory of this Spark job on every node.
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 5c2be534ff..8ce32e0e2f 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -471,6 +471,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
fromRDD(new OrderedRDDFunctions(rdd).sortByKey(ascending))
}
+
+ /**
+ * Return an RDD with the keys of each tuple.
+ */
+ def keys(): JavaRDD[K] = JavaRDD.fromRDD[K](rdd.map(_._1))
+
+ /**
+ * Return an RDD with the values of each tuple.
+ */
+ def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
}
object JavaPairRDD {
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 81d3a94466..d15f6dd02f 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -298,4 +298,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
+
+ /**
+ * Creates tuples of the elements in this RDD by applying `f`.
+ */
+ def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
+ implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ JavaPairRDD.fromRDD(rdd.keyBy(f))
+ }
}
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index b7725313c4..88ab2846be 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -10,7 +10,7 @@ import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import spark.{Accumulator, AccumulatorParam, RDD, SparkContext}
+import spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, RDD, SparkContext}
import spark.SparkContext.IntAccumulatorParam
import spark.SparkContext.DoubleAccumulatorParam
import spark.broadcast.Broadcast
@@ -265,26 +265,46 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
- * to using the `+=` method. Only the master can access the accumulator's `value`.
+ * to using the `add` method. Only the master can access the accumulator's `value`.
*/
- def intAccumulator(initialValue: Int): Accumulator[Int] =
- sc.accumulator(initialValue)(IntAccumulatorParam)
+ def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
+ sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
/**
* Create an [[spark.Accumulator]] double variable, which tasks can "add" values
- * to using the `+=` method. Only the master can access the accumulator's `value`.
+ * to using the `add` method. Only the master can access the accumulator's `value`.
*/
- def doubleAccumulator(initialValue: Double): Accumulator[Double] =
- sc.accumulator(initialValue)(DoubleAccumulatorParam)
+ def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
+ sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
+
+ /**
+ * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
+ * to using the `add` method. Only the master can access the accumulator's `value`.
+ */
+ def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
+
+ /**
+ * Create an [[spark.Accumulator]] double variable, which tasks can "add" values
+ * to using the `add` method. Only the master can access the accumulator's `value`.
+ */
+ def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
+ doubleAccumulator(initialValue)
/**
* Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
- * to using the `+=` method. Only the master can access the accumulator's `value`.
+ * to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)
/**
+ * Create an [[spark.Accumulable]] shared variable of the given type, to which tasks can
+ * "add" values with `add`. Only the master can access the accumuable's `value`.
+ */
+ def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
+ sc.accumulable(initialValue)(param)
+
+ /**
* Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
new file mode 100644
index 0000000000..732fa08064
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -0,0 +1,78 @@
+package spark.deploy
+
+import master.{JobInfo, WorkerInfo}
+import worker.ExecutorRunner
+import cc.spray.json._
+
+/**
+ * spray-json helper class containing implicit conversion to json for marshalling responses
+ */
+private[spark] object JsonProtocol extends DefaultJsonProtocol {
+ implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] {
+ def write(obj: WorkerInfo) = JsObject(
+ "id" -> JsString(obj.id),
+ "host" -> JsString(obj.host),
+ "webuiaddress" -> JsString(obj.webUiAddress),
+ "cores" -> JsNumber(obj.cores),
+ "coresused" -> JsNumber(obj.coresUsed),
+ "memory" -> JsNumber(obj.memory),
+ "memoryused" -> JsNumber(obj.memoryUsed)
+ )
+ }
+
+ implicit object JobInfoJsonFormat extends RootJsonWriter[JobInfo] {
+ def write(obj: JobInfo) = JsObject(
+ "starttime" -> JsNumber(obj.startTime),
+ "id" -> JsString(obj.id),
+ "name" -> JsString(obj.desc.name),
+ "cores" -> JsNumber(obj.desc.cores),
+ "user" -> JsString(obj.desc.user),
+ "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave),
+ "submitdate" -> JsString(obj.submitDate.toString))
+ }
+
+ implicit object JobDescriptionJsonFormat extends RootJsonWriter[JobDescription] {
+ def write(obj: JobDescription) = JsObject(
+ "name" -> JsString(obj.name),
+ "cores" -> JsNumber(obj.cores),
+ "memoryperslave" -> JsNumber(obj.memoryPerSlave),
+ "user" -> JsString(obj.user)
+ )
+ }
+
+ implicit object ExecutorRunnerJsonFormat extends RootJsonWriter[ExecutorRunner] {
+ def write(obj: ExecutorRunner) = JsObject(
+ "id" -> JsNumber(obj.execId),
+ "memory" -> JsNumber(obj.memory),
+ "jobid" -> JsString(obj.jobId),
+ "jobdesc" -> obj.jobDesc.toJson.asJsObject
+ )
+ }
+
+ implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] {
+ def write(obj: MasterState) = JsObject(
+ "url" -> JsString("spark://" + obj.uri),
+ "workers" -> JsArray(obj.workers.toList.map(_.toJson)),
+ "cores" -> JsNumber(obj.workers.map(_.cores).sum),
+ "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum),
+ "memory" -> JsNumber(obj.workers.map(_.memory).sum),
+ "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum),
+ "activejobs" -> JsArray(obj.activeJobs.toList.map(_.toJson)),
+ "completedjobs" -> JsArray(obj.completedJobs.toList.map(_.toJson))
+ )
+ }
+
+ implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] {
+ def write(obj: WorkerState) = JsObject(
+ "id" -> JsString(obj.workerId),
+ "masterurl" -> JsString(obj.masterUrl),
+ "masterwebuiurl" -> JsString(obj.masterWebUiUrl),
+ "cores" -> JsNumber(obj.cores),
+ "coresused" -> JsNumber(obj.coresUsed),
+ "memory" -> JsNumber(obj.memory),
+ "memoryused" -> JsNumber(obj.memoryUsed),
+ "executors" -> JsArray(obj.executors.toList.map(_.toJson)),
+ "finishedexecutors" -> JsArray(obj.finishedExecutors.toList.map(_.toJson))
+ )
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 3cdd3721f5..458ee2d665 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -8,7 +8,11 @@ import akka.util.duration._
import cc.spray.Directives
import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.http.MediaTypes
+import cc.spray.typeconversion.SprayJsonSupport._
+
import spark.deploy._
+import spark.deploy.JsonProtocol._
private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
@@ -19,29 +23,51 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
val handler = {
get {
- path("") {
- completeWith {
+ (path("") & parameters('format ?)) {
+ case Some(js) if js.equalsIgnoreCase("json") =>
val future = master ? RequestMasterState
- future.map {
- masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ ctx.complete(future.mapTo[MasterState])
+ }
+ case _ =>
+ completeWith {
+ val future = master ? RequestMasterState
+ future.map {
+ masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+ }
}
- }
} ~
path("job") {
- parameter("jobId") { jobId =>
- completeWith {
+ parameters("jobId", 'format ?) {
+ case (jobId, Some(js)) if (js.equalsIgnoreCase("json")) =>
val future = master ? RequestMasterState
- future.map { state =>
- val masterState = state.asInstanceOf[MasterState]
-
- // A bit ugly an inefficient, but we won't have a number of jobs
- // so large that it will make a significant difference.
- (masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match {
- case Some(job) => spark.deploy.master.html.job_details.render(job)
- case _ => null
+ val jobInfo = for (masterState <- future.mapTo[MasterState]) yield {
+ masterState.activeJobs.find(_.id == jobId) match {
+ case Some(job) => job
+ case _ => masterState.completedJobs.find(_.id == jobId) match {
+ case Some(job) => job
+ case _ => null
+ }
+ }
+ }
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ ctx.complete(jobInfo.mapTo[JobInfo])
+ }
+ case (jobId, _) =>
+ completeWith {
+ val future = master ? RequestMasterState
+ future.map { state =>
+ val masterState = state.asInstanceOf[MasterState]
+
+ masterState.activeJobs.find(_.id == jobId) match {
+ case Some(job) => spark.deploy.master.html.job_details.render(job)
+ case _ => masterState.completedJobs.find(_.id == jobId) match {
+ case Some(job) => spark.deploy.master.html.job_details.render(job)
+ case _ => null
+ }
+ }
}
}
- }
}
} ~
pathPrefix("static") {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
index 340920025b..37524a7c82 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
@@ -104,9 +104,25 @@ private[spark] class WorkerArguments(args: Array[String]) {
}
def inferDefaultMemory(): Int = {
- val bean = ManagementFactory.getOperatingSystemMXBean
- .asInstanceOf[com.sun.management.OperatingSystemMXBean]
- val totalMb = (bean.getTotalPhysicalMemorySize / 1024 / 1024).toInt
+ val ibmVendor = System.getProperty("java.vendor").contains("IBM")
+ var totalMb = 0
+ try {
+ val bean = ManagementFactory.getOperatingSystemMXBean()
+ if (ibmVendor) {
+ val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
+ val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
+ totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
+ } else {
+ val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
+ val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
+ totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
+ }
+ } catch {
+ case e: Exception => {
+ totalMb = 2*1024
+ System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
+ }
+ }
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, 512)
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index d06f4884ee..f9489d99fc 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -7,7 +7,11 @@ import akka.util.Timeout
import akka.util.duration._
import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.http.MediaTypes
+import cc.spray.typeconversion.SprayJsonSupport._
+
import spark.deploy.{WorkerState, RequestWorkerState}
+import spark.deploy.JsonProtocol._
private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
@@ -18,13 +22,20 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
val handler = {
get {
- path("") {
- completeWith{
+ (path("") & parameters('format ?)) {
+ case Some(js) if js.equalsIgnoreCase("json") => {
val future = worker ? RequestWorkerState
- future.map { workerState =>
- spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ ctx.complete(future.mapTo[WorkerState])
}
}
+ case _ =>
+ completeWith{
+ val future = worker ? RequestWorkerState
+ future.map { workerState =>
+ spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+ }
+ }
} ~
path("log") {
parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index 80262ab7b4..c193bf7c8d 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -135,8 +135,11 @@ extends Connection(SocketChannel.open, selector_) {
val chunk = message.getChunkForSending(defaultChunkSize)
if (chunk.isDefined) {
messages += message // this is probably incorrect, it wont work as fifo
- if (!message.started) logDebug("Starting to send [" + message + "]")
- message.started = true
+ if (!message.started) {
+ logDebug("Starting to send [" + message + "]")
+ message.started = true
+ message.startTime = System.currentTimeMillis
+ }
return chunk
} else {
/*logInfo("Finished sending [" + message + "] to [" + remoteConnectionManagerId + "]")*/
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 642fa4b525..36c01ad629 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -43,12 +43,12 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
val selector = SelectorProvider.provider.openSelector()
- val handleMessageExecutor = Executors.newFixedThreadPool(4)
+ val handleMessageExecutor = Executors.newFixedThreadPool(System.getProperty("spark.core.connection.handler.threads","20").toInt)
val serverChannel = ServerSocketChannel.open()
val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val messageStatuses = new HashMap[Int, MessageStatus]
- val connectionRequests = new SynchronizedQueue[SendingConnection]
+ val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)]
@@ -79,10 +79,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
def run() {
try {
while(!selectorThread.isInterrupted) {
- while(!connectionRequests.isEmpty) {
- val sendingConnection = connectionRequests.dequeue
+ for( (connectionManagerId, sendingConnection) <- connectionRequests) {
sendingConnection.connect()
addConnection(sendingConnection)
+ connectionRequests -= connectionManagerId
}
sendMessageRequests.synchronized {
while(!sendMessageRequests.isEmpty) {
@@ -300,8 +300,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
- val newConnection = new SendingConnection(inetSocketAddress, selector)
- connectionRequests += newConnection
+ val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector))
newConnection
}
val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
@@ -473,6 +472,7 @@ private[spark] object ConnectionManager {
val mb = size * count / 1024.0 / 1024.0
val ms = finishTime - startTime
val tput = mb * 1000.0 / ms
+ println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
println("--------------------------")
println()
}
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 47ceaf3c07..533e4610f3 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -13,8 +13,14 @@ import akka.util.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
+ //<mesos cluster> - the master URL
+ //<slaves file> - a list slaves to run connectionTest on
+ //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
+ //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
+ //[count] - how many times to run, default is 3
+ //[await time in seconds] : await time (in seconds), default is 600
if (args.length < 2) {
- println("Usage: ConnectionManagerTest <mesos cluster> <slaves file>")
+ println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
System.exit(1)
}
@@ -29,16 +35,19 @@ private[spark] object ConnectionManagerTest extends Logging{
/*println("Slaves")*/
/*slaves.foreach(println)*/
-
- val slaveConnManagerIds = sc.parallelize(0 until slaves.length, slaves.length).map(
+ val tasknum = if (args.length > 2) args(2).toInt else slaves.length
+ val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
+ val count = if (args.length > 4) args(4).toInt else 3
+ val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
+ println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
+ val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
i => SparkEnv.get.connectionManager.id).collect()
println("\nSlave ConnectionManagerIds")
slaveConnManagerIds.foreach(println)
println
- val count = 10
(0 until count).foreach(i => {
- val resultStrs = sc.parallelize(0 until slaves.length, slaves.length).map(i => {
+ val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
val connManager = SparkEnv.get.connectionManager
val thisConnManagerId = connManager.id
connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
@@ -46,7 +55,6 @@ private[spark] object ConnectionManagerTest extends Logging{
None
})
- val size = 100 * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
@@ -56,13 +64,13 @@ private[spark] object ConnectionManagerTest extends Logging{
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
})
- val results = futures.map(f => Await.result(f, 1.second))
+ val results = futures.map(f => Await.result(f, awaitTime))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)
val mb = size * results.size / 1024.0 / 1024.0
val ms = finishTime - startTime
- val resultStr = "Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
+ val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
logInfo(resultStr)
resultStr
}).collect()
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index cf4aae03a7..a089b71644 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -201,7 +201,11 @@ private[spark] class TaskSetManager(
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
val preferred = isPreferredLocation(task, host)
- val prefStr = if (preferred) "preferred" else "non-preferred"
+ val prefStr = if (preferred) {
+ "preferred"
+ } else {
+ "non-preferred, not one of " + task.preferredLocations.mkString(", ")
+ }
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
taskSet.id, index, taskId, slaveId, host, prefStr))
// Do various bookkeeping
diff --git a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala
deleted file mode 100644
index 37cafd1e8e..0000000000
--- a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.PrivateMethodTester
-import org.scalatest.matchers.ShouldMatchers
-
-// TODO: Replace this with a test of MemoryStore
-class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester with ShouldMatchers {
- test("constructor test") {
- val cache = new BoundedMemoryCache(60)
- expect(60)(cache.getCapacity)
- }
-
- test("caching") {
- // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
- val oldArch = System.setProperty("os.arch", "amd64")
- val oldOops = System.setProperty("spark.test.useCompressedOops", "true")
- val initialize = PrivateMethod[Unit]('initialize)
- SizeEstimator invokePrivate initialize()
-
- val cache = new BoundedMemoryCache(60) {
- //TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry'
- override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
- logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
- }
- }
-
- // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length
- // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
- // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
- // Work around to check for either.
-
- //should be OK
- cache.put("1", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48)))
-
- //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
- //cache because it's from the same dataset
- expect(CachePutFailure())(cache.put("1", 1, "Meh"))
-
- //should be OK, dataset '1' can be evicted from cache
- cache.put("2", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48)))
-
- //should fail, cache should obey it's capacity
- expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string"))
-
- if (oldArch != null) {
- System.setProperty("os.arch", oldArch)
- } else {
- System.clearProperty("os.arch")
- }
-
- if (oldOops != null) {
- System.setProperty("spark.test.useCompressedOops", oldOops)
- } else {
- System.clearProperty("spark.test.useCompressedOops")
- }
- }
-}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 33d5fc2d89..c61913fc82 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -581,4 +581,64 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
zipped.count();
}
+
+ @Test
+ public void accumulators() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+ final Accumulator<Integer> intAccum = sc.accumulator(10);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ intAccum.add(x);
+ }
+ });
+ Assert.assertEquals((Integer) 25, intAccum.value());
+
+ final Accumulator<Double> doubleAccum = sc.accumulator(10.0);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ doubleAccum.add((double) x);
+ }
+ });
+ Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+ // Try a custom accumulator type
+ AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+ public Float addInPlace(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float addAccumulator(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float zero(Float initialValue) {
+ return 0.0f;
+ }
+ };
+
+ final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ floatAccum.add((float) x);
+ }
+ });
+ Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+ // Test the setValue method
+ floatAccum.setValue(5.0f);
+ Assert.assertEquals((Float) 5.0f, floatAccum.value());
+ }
+
+ @Test
+ public void keyBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+ List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
+ public String call(Integer t) throws Exception {
+ return t.toString();
+ }
+ }).collect();
+ Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+ }
}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 3dadc7acec..f09b602a7b 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -107,4 +107,25 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter {
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
}
+
+ test("partitioning Java arrays should fail") {
+ sc = new SparkContext("local", "test")
+ val arrs: RDD[Array[Int]] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => Array(x))
+ val arrPairs: RDD[(Array[Int], Int)] =
+ sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
+
+ assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
+ // We can't catch all usages of arrays, since they might occur inside other collections:
+ //assert(fails { arrPairs.distinct() })
+ assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.cogroup(arrPairs) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
+ }
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 08da9a1c4d..d74e9786c3 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -35,6 +35,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(nums.flatMap(x => 1 to x).collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
+ assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
+ assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
assert(partitionSums.collect().toList === List(3, 7))
@@ -88,6 +90,29 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("caching with failures") {
+ sc = new SparkContext("local", "test")
+ val onlySplit = new Split { override def index: Int = 0 }
+ var shouldFail = true
+ val rdd = new RDD[Int](sc) {
+ override def splits: Array[Split] = Array(onlySplit)
+ override val dependencies = List[Dependency[_]]()
+ override def compute(split: Split, context: TaskContext): Iterator[Int] = {
+ if (shouldFail) {
+ throw new Exception("injected failure")
+ } else {
+ return Array(1, 2, 3, 4).iterator
+ }
+ }
+ }.cache()
+ val thrown = intercept[Exception]{
+ rdd.collect()
+ }
+ assert(thrown.getMessage.contains("injected failure"))
+ shouldFail = false
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ }
+
test("coalesced RDDs") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 8170100f1d..bebb8ebe86 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -216,6 +216,13 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
+
+ test("keys and values") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
+ assert(rdd.keys.collect().toList === List(1, 2))
+ assert(rdd.values.collect().toList === List("a", "b"))
+ }
}
object ShuffleSuite {
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala
index 17f366212b..e235ef2f67 100644
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala
@@ -3,7 +3,6 @@ package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
-import org.scalatest.matchers.ShouldMatchers
class DummyClass1 {}
@@ -20,8 +19,17 @@ class DummyClass4(val d: DummyClass3) {
val x: Int = 0
}
+object DummyString {
+ def apply(str: String) : DummyString = new DummyString(str.toArray)
+}
+class DummyString(val arr: Array[Char]) {
+ override val hashCode: Int = 0
+ // JDK-7 has an extra hash32 field http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/rev/11987e85555f
+ @transient val hash32: Int = 0
+}
+
class SizeEstimatorSuite
- extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers {
+ extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
var oldArch: String = _
var oldOops: String = _
@@ -45,15 +53,13 @@ class SizeEstimatorSuite
expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}
- // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length.
- // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
- // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
- // Work around to check for either.
+ // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
+ // (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("strings") {
- SizeEstimator.estimate("") should (equal (48) or equal (40))
- SizeEstimator.estimate("a") should (equal (56) or equal (48))
- SizeEstimator.estimate("ab") should (equal (56) or equal (48))
- SizeEstimator.estimate("abcdefgh") should (equal(64) or equal(56))
+ expect(40)(SizeEstimator.estimate(DummyString("")))
+ expect(48)(SizeEstimator.estimate(DummyString("a")))
+ expect(48)(SizeEstimator.estimate(DummyString("ab")))
+ expect(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
}
test("primitive arrays") {
@@ -105,18 +111,16 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- expect(40)(SizeEstimator.estimate(""))
- expect(48)(SizeEstimator.estimate("a"))
- expect(48)(SizeEstimator.estimate("ab"))
- expect(56)(SizeEstimator.estimate("abcdefgh"))
+ expect(40)(SizeEstimator.estimate(DummyString("")))
+ expect(48)(SizeEstimator.estimate(DummyString("a")))
+ expect(48)(SizeEstimator.estimate(DummyString("ab")))
+ expect(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
}
- // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length.
- // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
- // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
- // Work around to check for either.
+ // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
+ // (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
@@ -124,10 +128,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- SizeEstimator.estimate("") should (equal (64) or equal (56))
- SizeEstimator.estimate("a") should (equal (72) or equal (64))
- SizeEstimator.estimate("ab") should (equal (72) or equal (64))
- SizeEstimator.estimate("abcdefgh") should (equal (80) or equal (72))
+ expect(56)(SizeEstimator.estimate(DummyString("")))
+ expect(64)(SizeEstimator.estimate(DummyString("a")))
+ expect(64)(SizeEstimator.estimate(DummyString("ab")))
+ expect(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 2c7cfbed25..a4c4c9a8fb 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -201,6 +201,16 @@ To build the job, we also write a Maven `pom.xml` file that lists Spark as a dep
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
+ <repositories>
+ <repository>
+ <id>Spray.cc repository</id>
+ <url>http://repo.spray.cc</url>
+ </repository>
+ <repository>
+ <id>Typesafe repository</id>
+ <url>http://repo.typesafe.com/typesafe/releases</url>
+ </repository>
+ </repositories>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.spark-project</groupId>
diff --git a/examples/pom.xml b/examples/pom.xml
index 782c026d73..3355deb6b7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -45,6 +45,11 @@
<profiles>
<profile>
<id>hadoop1</id>
+ <activation>
+ <property>
+ <name>!hadoopVersion</name>
+ </property>
+ </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
@@ -72,6 +77,12 @@
</profile>
<profile>
<id>hadoop2</id>
+ <activation>
+ <property>
+ <name>hadoopVersion</name>
+ <value>2</value>
+ </property>
+ </activation>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala
index f2ac2b3e06..9553162004 100644
--- a/examples/src/main/scala/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/spark/examples/LocalLR.scala
@@ -5,7 +5,7 @@ import spark.util.Vector
object LocalLR {
val N = 10000 // Number of data points
- val D = 10 // Numer of dimensions
+ val D = 10 // Number of dimensions
val R = 0.7 // Scaling factor
val ITERATIONS = 5
val rand = new Random(42)
diff --git a/pom.xml b/pom.xml
index b33cee26b8..8f1af673a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -481,6 +481,12 @@
<profiles>
<profile>
<id>hadoop1</id>
+ <activation>
+ <property>
+ <name>!hadoopVersion</name>
+ </property>
+ </activation>
+
<properties>
<hadoop.major.version>1</hadoop.major.version>
</properties>
@@ -489,7 +495,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>0.20.205.0</version>
+ <version>1.0.3</version>
</dependency>
</dependencies>
</dependencyManagement>
@@ -497,6 +503,12 @@
<profile>
<id>hadoop2</id>
+ <activation>
+ <property>
+ <name>hadoopVersion</name>
+ <value>2</value>
+ </property>
+ </activation>
<properties>
<hadoop.major.version>2</hadoop.major.version>
</properties>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 842d0fa96b..219674028e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -10,7 +10,7 @@ import twirl.sbt.TwirlPlugin._
object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.3" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
- val HADOOP_VERSION = "0.20.205.0"
+ val HADOOP_VERSION = "1.0.3"
val HADOOP_MAJOR_VERSION = "1"
// For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2"
@@ -38,6 +38,7 @@ object SparkBuild extends Build {
scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
+ retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
@@ -133,6 +134,7 @@ object SparkBuild extends Build {
"colt" % "colt" % "1.2.0",
"cc.spray" % "spray-can" % "1.0-M2.1",
"cc.spray" % "spray-server" % "1.0-M2.1",
+ "cc.spray" %% "spray-json" % "1.1.1",
"org.apache.mesos" % "mesos" % "0.9.0-incubating"
) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index 0667b71cc7..da91c0f3ab 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -70,6 +70,11 @@
<profiles>
<profile>
<id>hadoop1</id>
+ <activation>
+ <property>
+ <name>!hadoopVersion</name>
+ </property>
+ </activation>
<properties>
<classifier>hadoop1</classifier>
</properties>
@@ -110,6 +115,12 @@
</profile>
<profile>
<id>hadoop2</id>
+ <activation>
+ <property>
+ <name>hadoopVersion</name>
+ <value>2</value>
+ </property>
+ </activation>
<properties>
<classifier>hadoop2</classifier>
</properties>
diff --git a/repl/pom.xml b/repl/pom.xml
index 114e3e9932..38e883c7f8 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -72,6 +72,11 @@
<profiles>
<profile>
<id>hadoop1</id>
+ <activation>
+ <property>
+ <name>!hadoopVersion</name>
+ </property>
+ </activation>
<properties>
<classifier>hadoop1</classifier>
</properties>
@@ -116,6 +121,12 @@
</profile>
<profile>
<id>hadoop2</id>
+ <activation>
+ <property>
+ <name>hadoopVersion</name>
+ <value>2</value>
+ </property>
+ </activation>
<properties>
<classifier>hadoop2</classifier>
</properties>
diff --git a/run b/run
index 08e2b2434b..ca23455386 100755
--- a/run
+++ b/run
@@ -76,16 +76,10 @@ CLASSPATH+=":$CORE_DIR/src/main/resources"
CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
if [ -e "$FWDIR/lib_managed" ]; then
- for jar in `find "$FWDIR/lib_managed/jars" -name '*jar'`; do
- CLASSPATH+=":$jar"
- done
- for jar in `find "$FWDIR/lib_managed/bundles" -name '*jar'`; do
- CLASSPATH+=":$jar"
- done
+ CLASSPATH+=":$FWDIR/lib_managed/jars/*"
+ CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
fi
-for jar in `find "$REPL_DIR/lib" -name '*jar'`; do
- CLASSPATH+=":$jar"
-done
+CLASSPATH+=":$REPL_DIR/lib/*"
for jar in `find "$REPL_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
CLASSPATH+=":$jar"
done