aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-20 03:54:46 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-20 03:54:46 -0800
commit76ff962edcb7f41601c6c2d4fc6714bbc885faa7 (patch)
tree87695f0e35c893ab2c9f3cc2c1e369beadc9c623 /core
parent11bbe231408a3223d110c89519a70184d58408af (diff)
parent33bad85bb9143d41bc5de2068f7e8a8c39928225 (diff)
downloadspark-76ff962edcb7f41601c6c2d4fc6714bbc885faa7.tar.gz
spark-76ff962edcb7f41601c6c2d4fc6714bbc885faa7.tar.bz2
spark-76ff962edcb7f41601c6c2d4fc6714bbc885faa7.zip
Merge pull request #380 from tdas/streaming
Merging pySpark to streaming
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml17
-rw-r--r--core/src/main/scala/spark/Accumulators.scala20
-rw-r--r--core/src/main/scala/spark/BoundedMemoryCache.scala118
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala35
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala10
-rw-r--r--core/src/main/scala/spark/RDD.scala31
-rw-r--r--core/src/main/scala/spark/RDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala8
-rw-r--r--core/src/main/scala/spark/SizeEstimator.scala13
-rw-r--r--core/src/main/scala/spark/SparkContext.scala13
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala10
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala8
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala13
-rw-r--r--core/src/main/scala/spark/api/python/PythonPartitioner.scala39
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala248
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala2
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala78
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala58
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerArguments.scala22
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala19
-rw-r--r--core/src/main/scala/spark/network/Connection.scala7
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala12
-rw-r--r--core/src/main/scala/spark/network/ConnectionManagerTest.scala24
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala17
-rw-r--r--core/src/main/scala/spark/rdd/FilteredRDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala6
-rw-r--r--core/src/test/scala/spark/BoundedMemoryCacheSuite.scala58
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java16
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala40
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala5
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala2
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala7
-rw-r--r--core/src/test/scala/spark/SizeEstimatorSuite.scala48
-rw-r--r--core/src/test/scala/spark/scheduler/TaskContextSuite.scala42
35 files changed, 773 insertions, 285 deletions
diff --git a/core/pom.xml b/core/pom.xml
index ae52c20657..862d3ec37a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -72,6 +72,10 @@
<artifactId>spray-server</artifactId>
</dependency>
<dependency>
+ <groupId>cc.spray</groupId>
+ <artifactId>spray-json_${scala.version}</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.tomdz.twirl</groupId>
<artifactId>twirl-api</artifactId>
</dependency>
@@ -159,6 +163,11 @@
<profiles>
<profile>
<id>hadoop1</id>
+ <activation>
+ <property>
+ <name>!hadoopVersion</name>
+ </property>
+ </activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -211,6 +220,12 @@
</profile>
<profile>
<id>hadoop2</id>
+ <activation>
+ <property>
+ <name>hadoopVersion</name>
+ <value>2</value>
+ </property>
+ </activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -267,4 +282,4 @@
</build>
</profile>
</profiles>
-</project> \ No newline at end of file
+</project>
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index 6280f25391..b644aba5f8 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -63,9 +63,12 @@ class Accumulable[R, T] (
/**
* Access the accumulator's current value; only allowed on master.
*/
- def value = {
- if (!deserialized) value_
- else throw new UnsupportedOperationException("Can't read accumulator value in task")
+ def value: R = {
+ if (!deserialized) {
+ value_
+ } else {
+ throw new UnsupportedOperationException("Can't read accumulator value in task")
+ }
}
/**
@@ -82,10 +85,17 @@ class Accumulable[R, T] (
/**
* Set the accumulator's value; only allowed on master.
*/
- def value_= (r: R) {
- if (!deserialized) value_ = r
+ def value_= (newValue: R) {
+ if (!deserialized) value_ = newValue
else throw new UnsupportedOperationException("Can't assign accumulator value in task")
}
+
+ /**
+ * Set the accumulator's value; only allowed on master
+ */
+ def setValue(newValue: R) {
+ this.value = newValue
+ }
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala
deleted file mode 100644
index e8392a194f..0000000000
--- a/core/src/main/scala/spark/BoundedMemoryCache.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-package spark
-
-import java.util.LinkedHashMap
-
-/**
- * An implementation of Cache that estimates the sizes of its entries and attempts to limit its
- * total memory usage to a fraction of the JVM heap. Objects' sizes are estimated using
- * SizeEstimator, which has limitations; most notably, we will overestimate total memory used if
- * some cache entries have pointers to a shared object. Nonetheless, this Cache should work well
- * when most of the space is used by arrays of primitives or of simple classes.
- */
-private[spark] class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
- logInfo("BoundedMemoryCache.maxBytes = " + maxBytes)
-
- def this() {
- this(BoundedMemoryCache.getMaxBytes)
- }
-
- private var currentBytes = 0L
- private val map = new LinkedHashMap[(Any, Int), Entry](32, 0.75f, true)
-
- override def get(datasetId: Any, partition: Int): Any = {
- synchronized {
- val entry = map.get((datasetId, partition))
- if (entry != null) {
- entry.value
- } else {
- null
- }
- }
- }
-
- override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
- val key = (datasetId, partition)
- logInfo("Asked to add key " + key)
- val size = estimateValueSize(key, value)
- synchronized {
- if (size > getCapacity) {
- return CachePutFailure()
- } else if (ensureFreeSpace(datasetId, size)) {
- logInfo("Adding key " + key)
- map.put(key, new Entry(value, size))
- currentBytes += size
- logInfo("Number of entries is now " + map.size)
- return CachePutSuccess(size)
- } else {
- logInfo("Didn't add key " + key + " because we would have evicted part of same dataset")
- return CachePutFailure()
- }
- }
- }
-
- override def getCapacity: Long = maxBytes
-
- /**
- * Estimate sizeOf 'value'
- */
- private def estimateValueSize(key: (Any, Int), value: Any) = {
- val startTime = System.currentTimeMillis
- val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef])
- val timeTaken = System.currentTimeMillis - startTime
- logInfo("Estimated size for key %s is %d".format(key, size))
- logInfo("Size estimation for key %s took %d ms".format(key, timeTaken))
- size
- }
-
- /**
- * Remove least recently used entries from the map until at least space bytes are free, in order
- * to make space for a partition from the given dataset ID. If this cannot be done without
- * evicting other data from the same dataset, returns false; otherwise, returns true. Assumes
- * that a lock is held on the BoundedMemoryCache.
- */
- private def ensureFreeSpace(datasetId: Any, space: Long): Boolean = {
- logInfo("ensureFreeSpace(%s, %d) called with curBytes=%d, maxBytes=%d".format(
- datasetId, space, currentBytes, maxBytes))
- val iter = map.entrySet.iterator // Will give entries in LRU order
- while (maxBytes - currentBytes < space && iter.hasNext) {
- val mapEntry = iter.next()
- val (entryDatasetId, entryPartition) = mapEntry.getKey
- if (entryDatasetId == datasetId) {
- // Cannot make space without removing part of the same dataset, or a more recently used one
- return false
- }
- reportEntryDropped(entryDatasetId, entryPartition, mapEntry.getValue)
- currentBytes -= mapEntry.getValue.size
- iter.remove()
- }
- return true
- }
-
- protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
- logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
- // TODO: remove BoundedMemoryCache
-
- val (keySpaceId, innerDatasetId) = datasetId.asInstanceOf[(Any, Any)]
- innerDatasetId match {
- case rddId: Int =>
- SparkEnv.get.cacheTracker.dropEntry(rddId, partition)
- case broadcastUUID: java.util.UUID =>
- // TODO: Maybe something should be done if the broadcasted variable falls out of cache
- case _ =>
- }
- }
-}
-
-// An entry in our map; stores a cached object and its size in bytes
-private[spark] case class Entry(value: Any, size: Long)
-
-private[spark] object BoundedMemoryCache {
- /**
- * Get maximum cache capacity from system configuration
- */
- def getMaxBytes: Long = {
- val memoryFractionToUse = System.getProperty("spark.boundedMemoryCache.memoryFraction", "0.66").toDouble
- (Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong
- }
-}
-
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index a2fa2d1ea7..ac02f3363a 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -142,8 +142,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
case e: InterruptedException =>
}
}
- return mapStatuses(shuffleId).map(status =>
- (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId))))
+ return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, mapStatuses(shuffleId))
} else {
fetching += shuffleId
}
@@ -159,21 +158,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
- if (fetchedStatuses.contains(null)) {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing an output location for shuffle " + shuffleId))
- }
} finally {
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
}
- return fetchedStatuses.map(s =>
- (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
+ return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
} else {
- return statuses.map(s =>
- (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
+ return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
}
@@ -267,6 +260,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
private[spark] object MapOutputTracker {
private val LOG_BASE = 1.1
+ // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
+ // any of the statuses is null (indicating a missing location due to a failed mapper),
+ // throw a FetchFailedException.
+ def convertMapStatuses(
+ shuffleId: Int,
+ reduceId: Int,
+ statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
+ if (statuses == null) {
+ throw new FetchFailedException(null, shuffleId, -1, reduceId,
+ new Exception("Missing all output locations for shuffle " + shuffleId))
+ }
+ statuses.map {
+ status =>
+ if (status == null) {
+ throw new FetchFailedException(null, shuffleId, -1, reduceId,
+ new Exception("Missing an output location for shuffle " + shuffleId))
+ } else {
+ (status.address, decompressSize(status.compressedSizes(reduceId)))
+ }
+ }
+ }
+
/**
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
* We do this by encoding the log base 1.1 of the size as an integer, which can support
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index d95b66ad78..abb01c387c 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -615,6 +615,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
writer.cleanup()
}
+ /**
+ * Return an RDD with the keys of each tuple.
+ */
+ def keys: RDD[K] = self.map(_._1)
+
+ /**
+ * Return an RDD with the values of each tuple.
+ */
+ def values: RDD[V] = self.map(_._2)
+
private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 0de6f04d50..e0d2eabb1d 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -349,6 +349,13 @@ abstract class RDD[T: ClassManifest](
def toArray(): Array[T] = collect()
/**
+ * Return an RDD that contains all matching values by applying `f`.
+ */
+ def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
+ filter(f.isDefinedAt).map(f)
+ }
+
+ /**
* Reduces the elements of this RDD using the specified associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
@@ -529,23 +536,29 @@ abstract class RDD[T: ClassManifest](
.saveAsSequenceFile(path)
}
+ /**
+ * Creates tuples of the elements in this RDD by applying `f`.
+ */
+ def keyBy[K](f: T => K): RDD[(K, T)] = {
+ map(x => (f(x), x))
+ }
+
/** A private method for tests, to look at the contents of each partition */
private[spark] def collectPartitions(): Array[Array[T]] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
/**
- * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir`
- * (set using setCheckpointDir()) and all references to its parent RDDs will be removed.
- * This is used to truncate very long lineages. In the current implementation, Spark will save
- * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done.
- * Hence, it is strongly recommended to use checkpoint() on RDDs when
- * (i) checkpoint() is called before the any job has been executed on this RDD.
- * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will
- * require recomputation.
+ * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
+ * directory set with SparkContext.setCheckpointDir() and all references to its parent
+ * RDDs will be removed. This function must be called before any job has been
+ * executed on this RDD. It is strongly recommended that this RDD is persisted in
+ * memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint() {
- if (checkpointData.isEmpty) {
+ if (context.checkpointDir.isEmpty) {
+ throw new Exception("Checkpoint directory has not been set in the SparkContext")
+ } else if (checkpointData.isEmpty) {
checkpointData = Some(new RDDCheckpointData(this))
checkpointData.get.markForCheckpoint()
}
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
index d845a522e4..18df530b7d 100644
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -63,7 +63,7 @@ extends Logging with Serializable {
}
// Save to file, and reload it as an RDD
- val path = new Path(rdd.context.checkpointDir, "rdd-" + rdd.id).toString
+ val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _)
val newRDD = new CheckpointRDD[T](rdd.context, path)
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index a34aee69c1..6b4a11d6d3 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -42,7 +42,13 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
} else {
- implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ // We get the type of the Writable class by looking at the apply method which converts
+ // from T to Writable. Since we have two apply methods we filter out the one which
+ // is of the form "java.lang.Object apply(java.lang.Object)"
+ implicitly[T => Writable].getClass.getDeclaredMethods().filter(
+ m => m.getReturnType().toString != "java.lang.Object" &&
+ m.getName() == "apply")(0).getReturnType
+
}
// TODO: use something like WritableConverter to avoid reflection
}
diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala
index 7c3e8640e9..d4e1157250 100644
--- a/core/src/main/scala/spark/SizeEstimator.scala
+++ b/core/src/main/scala/spark/SizeEstimator.scala
@@ -9,7 +9,6 @@ import java.util.Random
import javax.management.MBeanServer
import java.lang.management.ManagementFactory
-import com.sun.management.HotSpotDiagnosticMXBean
import scala.collection.mutable.ArrayBuffer
@@ -76,12 +75,20 @@ private[spark] object SizeEstimator extends Logging {
if (System.getProperty("spark.test.useCompressedOops") != null) {
return System.getProperty("spark.test.useCompressedOops").toBoolean
}
+
try {
val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"
val server = ManagementFactory.getPlatformMBeanServer()
+
+ // NOTE: This should throw an exception in non-Sun JVMs
+ val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean")
+ val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
+ Class.forName("java.lang.String"))
+
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
- hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean])
- return bean.getVMOption("UseCompressedOops").getValue.toBoolean
+ hotSpotMBeanName, hotSpotMBeanClass)
+ // TODO: We could use reflection on the VMOption returned ?
+ return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
} catch {
case e: Exception => {
// Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 88cf357ebf..7f3259d982 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -184,7 +184,7 @@ class SparkContext(
private var dagScheduler = new DAGScheduler(taskScheduler)
- private[spark] var checkpointDir: String = null
+ private[spark] var checkpointDir: Option[String] = None
// Methods for creating RDDs
@@ -595,10 +595,11 @@ class SparkContext(
}
/**
- * Set the directory under which RDDs are going to be checkpointed. This method will
- * create this directory and will throw an exception of the path already exists (to avoid
- * overwriting existing files may be overwritten). The directory will be deleted on exit
- * if indicated.
+ * Set the directory under which RDDs are going to be checkpointed. The directory must
+ * be a HDFS path if running on a cluster. If the directory does not exist, it will
+ * be created. If the directory exists and useExisting is set to true, then the
+ * exisiting directory will be used. Otherwise an exception will be thrown to
+ * prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
val path = new Path(dir)
@@ -610,7 +611,7 @@ class SparkContext(
fs.mkdirs(path)
}
}
- checkpointDir = dir
+ checkpointDir = Some(dir)
}
/** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 5c2be534ff..8ce32e0e2f 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -471,6 +471,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
fromRDD(new OrderedRDDFunctions(rdd).sortByKey(ascending))
}
+
+ /**
+ * Return an RDD with the keys of each tuple.
+ */
+ def keys(): JavaRDD[K] = JavaRDD.fromRDD[K](rdd.map(_._1))
+
+ /**
+ * Return an RDD with the values of each tuple.
+ */
+ def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
}
object JavaPairRDD {
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 958f5c26a1..087270e46d 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -301,6 +301,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
/**
+ * Creates tuples of the elements in this RDD by applying `f`.
+ */
+ def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
+ implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ JavaPairRDD.fromRDD(rdd.keyBy(f))
+ }
+
+ /**
* Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir`
* (set using setCheckpointDir()) and all references to its parent RDDs will be removed.
* This is used to truncate very long lineages. In the current implementation, Spark will save
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index 22bfa2280d..fa2f14113d 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -278,6 +278,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
/**
+ * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
+ * to using the `add` method. Only the master can access the accumulator's `value`.
+ */
+ def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
+
+ /**
+ * Create an [[spark.Accumulator]] double variable, which tasks can "add" values
+ * to using the `add` method. Only the master can access the accumulator's `value`.
+ */
+ def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
+ doubleAccumulator(initialValue)
+
+ /**
* Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
new file mode 100644
index 0000000000..648d9402b0
--- /dev/null
+++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -0,0 +1,39 @@
+package spark.api.python
+
+import spark.Partitioner
+
+import java.util.Arrays
+
+/**
+ * A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
+ */
+private[spark] class PythonPartitioner(override val numPartitions: Int) extends Partitioner {
+
+ override def getPartition(key: Any): Int = {
+ if (key == null) {
+ return 0
+ }
+ else {
+ val hashCode = {
+ if (key.isInstanceOf[Array[Byte]]) {
+ Arrays.hashCode(key.asInstanceOf[Array[Byte]])
+ } else {
+ key.hashCode()
+ }
+ }
+ val mod = hashCode % numPartitions
+ if (mod < 0) {
+ mod + numPartitions
+ } else {
+ mod // Guard against negative hash codes
+ }
+ }
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case h: PythonPartitioner =>
+ h.numPartitions == numPartitions
+ case _ =>
+ false
+ }
+}
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
new file mode 100644
index 0000000000..0138b22d38
--- /dev/null
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -0,0 +1,248 @@
+package spark.api.python
+
+import java.io._
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.io.Source
+
+import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
+import spark.broadcast.Broadcast
+import spark._
+import spark.rdd.PipedRDD
+import java.util
+
+
+private[spark] class PythonRDD[T: ClassManifest](
+ parent: RDD[T],
+ command: Seq[String],
+ envVars: java.util.Map[String, String],
+ preservePartitoning: Boolean,
+ pythonExec: String,
+ broadcastVars: java.util.List[Broadcast[Array[Byte]]])
+ extends RDD[Array[Byte]](parent) {
+
+ // Similar to Runtime.exec(), if we are given a single string, split it into words
+ // using a standard StringTokenizer (i.e. by spaces)
+ def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
+ preservePartitoning: Boolean, pythonExec: String,
+ broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
+ this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
+ broadcastVars)
+
+ override def getSplits = parent.splits
+
+ override val partitioner = if (preservePartitoning) parent.partitioner else None
+
+ override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = {
+ val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
+
+ val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py"))
+ // Add the environmental variables to the process.
+ val currentEnvVars = pb.environment()
+
+ for ((variable, value) <- envVars) {
+ currentEnvVars.put(variable, value)
+ }
+
+ val proc = pb.start()
+ val env = SparkEnv.get
+
+ // Start a thread to print the process's stderr to ours
+ new Thread("stderr reader for " + command) {
+ override def run() {
+ for (line <- Source.fromInputStream(proc.getErrorStream).getLines) {
+ System.err.println(line)
+ }
+ }
+ }.start()
+
+ // Start a thread to feed the process input from our parent's iterator
+ new Thread("stdin writer for " + command) {
+ override def run() {
+ SparkEnv.set(env)
+ val out = new PrintWriter(proc.getOutputStream)
+ val dOut = new DataOutputStream(proc.getOutputStream)
+ // Split index
+ dOut.writeInt(split.index)
+ // Broadcast variables
+ dOut.writeInt(broadcastVars.length)
+ for (broadcast <- broadcastVars) {
+ dOut.writeLong(broadcast.id)
+ dOut.writeInt(broadcast.value.length)
+ dOut.write(broadcast.value)
+ dOut.flush()
+ }
+ // Serialized user code
+ for (elem <- command) {
+ out.println(elem)
+ }
+ out.flush()
+ // Data values
+ for (elem <- parent.iterator(split, context)) {
+ PythonRDD.writeAsPickle(elem, dOut)
+ }
+ dOut.flush()
+ out.flush()
+ proc.getOutputStream.close()
+ }
+ }.start()
+
+ // Return an iterator that read lines from the process's stdout
+ val stream = new DataInputStream(proc.getInputStream)
+ return new Iterator[Array[Byte]] {
+ def next() = {
+ val obj = _nextObj
+ _nextObj = read()
+ obj
+ }
+
+ private def read() = {
+ try {
+ val length = stream.readInt()
+ val obj = new Array[Byte](length)
+ stream.readFully(obj)
+ obj
+ } catch {
+ case eof: EOFException => {
+ val exitStatus = proc.waitFor()
+ if (exitStatus != 0) {
+ throw new Exception("Subprocess exited with status " + exitStatus)
+ }
+ new Array[Byte](0)
+ }
+ case e => throw e
+ }
+ }
+
+ var _nextObj = read()
+
+ def hasNext = _nextObj.length != 0
+ }
+ }
+
+ override def checkpoint() { }
+
+ val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
+}
+
+/**
+ * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
+ * This is used by PySpark's shuffle operations.
+ */
+private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
+ RDD[(Array[Byte], Array[Byte])](prev) {
+ override def getSplits = prev.splits
+ override def compute(split: Split, context: TaskContext) =
+ prev.iterator(split, context).grouped(2).map {
+ case Seq(a, b) => (a, b)
+ case x => throw new Exception("PairwiseRDD: unexpected value: " + x)
+ }
+ override def checkpoint() { }
+ val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
+}
+
+private[spark] object PythonRDD {
+
+ /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */
+ def stripPickle(arr: Array[Byte]) : Array[Byte] = {
+ arr.slice(2, arr.length - 1)
+ }
+
+ /**
+ * Write strings, pickled Python objects, or pairs of pickled objects to a data output stream.
+ * The data format is a 32-bit integer representing the pickled object's length (in bytes),
+ * followed by the pickled data.
+ *
+ * Pickle module:
+ *
+ * http://docs.python.org/2/library/pickle.html
+ *
+ * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules:
+ *
+ * http://hg.python.org/cpython/file/2.6/Lib/pickle.py
+ * http://hg.python.org/cpython/file/2.6/Lib/pickletools.py
+ *
+ * @param elem the object to write
+ * @param dOut a data output stream
+ */
+ def writeAsPickle(elem: Any, dOut: DataOutputStream) {
+ if (elem.isInstanceOf[Array[Byte]]) {
+ val arr = elem.asInstanceOf[Array[Byte]]
+ dOut.writeInt(arr.length)
+ dOut.write(arr)
+ } else if (elem.isInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]) {
+ val t = elem.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]
+ val length = t._1.length + t._2.length - 3 - 3 + 4 // stripPickle() removes 3 bytes
+ dOut.writeInt(length)
+ dOut.writeByte(Pickle.PROTO)
+ dOut.writeByte(Pickle.TWO)
+ dOut.write(PythonRDD.stripPickle(t._1))
+ dOut.write(PythonRDD.stripPickle(t._2))
+ dOut.writeByte(Pickle.TUPLE2)
+ dOut.writeByte(Pickle.STOP)
+ } else if (elem.isInstanceOf[String]) {
+ // For uniformity, strings are wrapped into Pickles.
+ val s = elem.asInstanceOf[String].getBytes("UTF-8")
+ val length = 2 + 1 + 4 + s.length + 1
+ dOut.writeInt(length)
+ dOut.writeByte(Pickle.PROTO)
+ dOut.writeByte(Pickle.TWO)
+ dOut.write(Pickle.BINUNICODE)
+ dOut.writeInt(Integer.reverseBytes(s.length))
+ dOut.write(s)
+ dOut.writeByte(Pickle.STOP)
+ } else {
+ throw new Exception("Unexpected RDD type")
+ }
+ }
+
+ def readRDDFromPickleFile(sc: JavaSparkContext, filename: String, parallelism: Int) :
+ JavaRDD[Array[Byte]] = {
+ val file = new DataInputStream(new FileInputStream(filename))
+ val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
+ try {
+ while (true) {
+ val length = file.readInt()
+ val obj = new Array[Byte](length)
+ file.readFully(obj)
+ objs.append(obj)
+ }
+ } catch {
+ case eof: EOFException => {}
+ case e => throw e
+ }
+ JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
+ }
+
+ def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
+ val file = new DataOutputStream(new FileOutputStream(filename))
+ for (item <- items) {
+ writeAsPickle(item, file)
+ }
+ file.close()
+ }
+
+ def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] =
+ rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head
+}
+
+private object Pickle {
+ val PROTO: Byte = 0x80.toByte
+ val TWO: Byte = 0x02.toByte
+ val BINUNICODE: Byte = 'X'
+ val STOP: Byte = '.'
+ val TUPLE2: Byte = 0x86.toByte
+ val EMPTY_LIST: Byte = ']'
+ val MARK: Byte = '('
+ val APPENDS: Byte = 'e'
+}
+
+private class ExtractValue extends spark.api.java.function.Function[(Array[Byte],
+ Array[Byte]), Array[Byte]] {
+ override def call(pair: (Array[Byte], Array[Byte])) : Array[Byte] = pair._2
+}
+
+private class BytesToString extends spark.api.java.function.Function[Array[Byte], String] {
+ override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
+}
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index 6055bfd045..2ffe7f741d 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicLong
import spark._
-abstract class Broadcast[T](id: Long) extends Serializable {
+abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
def value: T
// We cannot have an abstract readObject here due to some weird issues with
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
new file mode 100644
index 0000000000..732fa08064
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -0,0 +1,78 @@
+package spark.deploy
+
+import master.{JobInfo, WorkerInfo}
+import worker.ExecutorRunner
+import cc.spray.json._
+
+/**
+ * spray-json helper class containing implicit conversion to json for marshalling responses
+ */
+private[spark] object JsonProtocol extends DefaultJsonProtocol {
+ implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] {
+ def write(obj: WorkerInfo) = JsObject(
+ "id" -> JsString(obj.id),
+ "host" -> JsString(obj.host),
+ "webuiaddress" -> JsString(obj.webUiAddress),
+ "cores" -> JsNumber(obj.cores),
+ "coresused" -> JsNumber(obj.coresUsed),
+ "memory" -> JsNumber(obj.memory),
+ "memoryused" -> JsNumber(obj.memoryUsed)
+ )
+ }
+
+ implicit object JobInfoJsonFormat extends RootJsonWriter[JobInfo] {
+ def write(obj: JobInfo) = JsObject(
+ "starttime" -> JsNumber(obj.startTime),
+ "id" -> JsString(obj.id),
+ "name" -> JsString(obj.desc.name),
+ "cores" -> JsNumber(obj.desc.cores),
+ "user" -> JsString(obj.desc.user),
+ "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave),
+ "submitdate" -> JsString(obj.submitDate.toString))
+ }
+
+ implicit object JobDescriptionJsonFormat extends RootJsonWriter[JobDescription] {
+ def write(obj: JobDescription) = JsObject(
+ "name" -> JsString(obj.name),
+ "cores" -> JsNumber(obj.cores),
+ "memoryperslave" -> JsNumber(obj.memoryPerSlave),
+ "user" -> JsString(obj.user)
+ )
+ }
+
+ implicit object ExecutorRunnerJsonFormat extends RootJsonWriter[ExecutorRunner] {
+ def write(obj: ExecutorRunner) = JsObject(
+ "id" -> JsNumber(obj.execId),
+ "memory" -> JsNumber(obj.memory),
+ "jobid" -> JsString(obj.jobId),
+ "jobdesc" -> obj.jobDesc.toJson.asJsObject
+ )
+ }
+
+ implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] {
+ def write(obj: MasterState) = JsObject(
+ "url" -> JsString("spark://" + obj.uri),
+ "workers" -> JsArray(obj.workers.toList.map(_.toJson)),
+ "cores" -> JsNumber(obj.workers.map(_.cores).sum),
+ "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum),
+ "memory" -> JsNumber(obj.workers.map(_.memory).sum),
+ "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum),
+ "activejobs" -> JsArray(obj.activeJobs.toList.map(_.toJson)),
+ "completedjobs" -> JsArray(obj.completedJobs.toList.map(_.toJson))
+ )
+ }
+
+ implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] {
+ def write(obj: WorkerState) = JsObject(
+ "id" -> JsString(obj.workerId),
+ "masterurl" -> JsString(obj.masterUrl),
+ "masterwebuiurl" -> JsString(obj.masterWebUiUrl),
+ "cores" -> JsNumber(obj.cores),
+ "coresused" -> JsNumber(obj.coresUsed),
+ "memory" -> JsNumber(obj.memory),
+ "memoryused" -> JsNumber(obj.memoryUsed),
+ "executors" -> JsArray(obj.executors.toList.map(_.toJson)),
+ "finishedexecutors" -> JsArray(obj.finishedExecutors.toList.map(_.toJson))
+ )
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 3cdd3721f5..458ee2d665 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -8,7 +8,11 @@ import akka.util.duration._
import cc.spray.Directives
import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.http.MediaTypes
+import cc.spray.typeconversion.SprayJsonSupport._
+
import spark.deploy._
+import spark.deploy.JsonProtocol._
private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
@@ -19,29 +23,51 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
val handler = {
get {
- path("") {
- completeWith {
+ (path("") & parameters('format ?)) {
+ case Some(js) if js.equalsIgnoreCase("json") =>
val future = master ? RequestMasterState
- future.map {
- masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ ctx.complete(future.mapTo[MasterState])
+ }
+ case _ =>
+ completeWith {
+ val future = master ? RequestMasterState
+ future.map {
+ masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+ }
}
- }
} ~
path("job") {
- parameter("jobId") { jobId =>
- completeWith {
+ parameters("jobId", 'format ?) {
+ case (jobId, Some(js)) if (js.equalsIgnoreCase("json")) =>
val future = master ? RequestMasterState
- future.map { state =>
- val masterState = state.asInstanceOf[MasterState]
-
- // A bit ugly an inefficient, but we won't have a number of jobs
- // so large that it will make a significant difference.
- (masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match {
- case Some(job) => spark.deploy.master.html.job_details.render(job)
- case _ => null
+ val jobInfo = for (masterState <- future.mapTo[MasterState]) yield {
+ masterState.activeJobs.find(_.id == jobId) match {
+ case Some(job) => job
+ case _ => masterState.completedJobs.find(_.id == jobId) match {
+ case Some(job) => job
+ case _ => null
+ }
+ }
+ }
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ ctx.complete(jobInfo.mapTo[JobInfo])
+ }
+ case (jobId, _) =>
+ completeWith {
+ val future = master ? RequestMasterState
+ future.map { state =>
+ val masterState = state.asInstanceOf[MasterState]
+
+ masterState.activeJobs.find(_.id == jobId) match {
+ case Some(job) => spark.deploy.master.html.job_details.render(job)
+ case _ => masterState.completedJobs.find(_.id == jobId) match {
+ case Some(job) => spark.deploy.master.html.job_details.render(job)
+ case _ => null
+ }
+ }
}
}
- }
}
} ~
pathPrefix("static") {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
index 340920025b..37524a7c82 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
@@ -104,9 +104,25 @@ private[spark] class WorkerArguments(args: Array[String]) {
}
def inferDefaultMemory(): Int = {
- val bean = ManagementFactory.getOperatingSystemMXBean
- .asInstanceOf[com.sun.management.OperatingSystemMXBean]
- val totalMb = (bean.getTotalPhysicalMemorySize / 1024 / 1024).toInt
+ val ibmVendor = System.getProperty("java.vendor").contains("IBM")
+ var totalMb = 0
+ try {
+ val bean = ManagementFactory.getOperatingSystemMXBean()
+ if (ibmVendor) {
+ val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
+ val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
+ totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
+ } else {
+ val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
+ val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
+ totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
+ }
+ } catch {
+ case e: Exception => {
+ totalMb = 2*1024
+ System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
+ }
+ }
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, 512)
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index d06f4884ee..f9489d99fc 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -7,7 +7,11 @@ import akka.util.Timeout
import akka.util.duration._
import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.http.MediaTypes
+import cc.spray.typeconversion.SprayJsonSupport._
+
import spark.deploy.{WorkerState, RequestWorkerState}
+import spark.deploy.JsonProtocol._
private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
@@ -18,13 +22,20 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
val handler = {
get {
- path("") {
- completeWith{
+ (path("") & parameters('format ?)) {
+ case Some(js) if js.equalsIgnoreCase("json") => {
val future = worker ? RequestWorkerState
- future.map { workerState =>
- spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ ctx.complete(future.mapTo[WorkerState])
}
}
+ case _ =>
+ completeWith{
+ val future = worker ? RequestWorkerState
+ future.map { workerState =>
+ spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+ }
+ }
} ~
path("log") {
parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index 80262ab7b4..c193bf7c8d 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -135,8 +135,11 @@ extends Connection(SocketChannel.open, selector_) {
val chunk = message.getChunkForSending(defaultChunkSize)
if (chunk.isDefined) {
messages += message // this is probably incorrect, it wont work as fifo
- if (!message.started) logDebug("Starting to send [" + message + "]")
- message.started = true
+ if (!message.started) {
+ logDebug("Starting to send [" + message + "]")
+ message.started = true
+ message.startTime = System.currentTimeMillis
+ }
return chunk
} else {
/*logInfo("Finished sending [" + message + "] to [" + remoteConnectionManagerId + "]")*/
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 642fa4b525..36c01ad629 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -43,12 +43,12 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
val selector = SelectorProvider.provider.openSelector()
- val handleMessageExecutor = Executors.newFixedThreadPool(4)
+ val handleMessageExecutor = Executors.newFixedThreadPool(System.getProperty("spark.core.connection.handler.threads","20").toInt)
val serverChannel = ServerSocketChannel.open()
val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val messageStatuses = new HashMap[Int, MessageStatus]
- val connectionRequests = new SynchronizedQueue[SendingConnection]
+ val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)]
@@ -79,10 +79,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
def run() {
try {
while(!selectorThread.isInterrupted) {
- while(!connectionRequests.isEmpty) {
- val sendingConnection = connectionRequests.dequeue
+ for( (connectionManagerId, sendingConnection) <- connectionRequests) {
sendingConnection.connect()
addConnection(sendingConnection)
+ connectionRequests -= connectionManagerId
}
sendMessageRequests.synchronized {
while(!sendMessageRequests.isEmpty) {
@@ -300,8 +300,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
- val newConnection = new SendingConnection(inetSocketAddress, selector)
- connectionRequests += newConnection
+ val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector))
newConnection
}
val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
@@ -473,6 +472,7 @@ private[spark] object ConnectionManager {
val mb = size * count / 1024.0 / 1024.0
val ms = finishTime - startTime
val tput = mb * 1000.0 / ms
+ println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
println("--------------------------")
println()
}
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 47ceaf3c07..533e4610f3 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -13,8 +13,14 @@ import akka.util.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
+ //<mesos cluster> - the master URL
+ //<slaves file> - a list slaves to run connectionTest on
+ //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
+ //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
+ //[count] - how many times to run, default is 3
+ //[await time in seconds] : await time (in seconds), default is 600
if (args.length < 2) {
- println("Usage: ConnectionManagerTest <mesos cluster> <slaves file>")
+ println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
System.exit(1)
}
@@ -29,16 +35,19 @@ private[spark] object ConnectionManagerTest extends Logging{
/*println("Slaves")*/
/*slaves.foreach(println)*/
-
- val slaveConnManagerIds = sc.parallelize(0 until slaves.length, slaves.length).map(
+ val tasknum = if (args.length > 2) args(2).toInt else slaves.length
+ val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
+ val count = if (args.length > 4) args(4).toInt else 3
+ val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
+ println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
+ val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
i => SparkEnv.get.connectionManager.id).collect()
println("\nSlave ConnectionManagerIds")
slaveConnManagerIds.foreach(println)
println
- val count = 10
(0 until count).foreach(i => {
- val resultStrs = sc.parallelize(0 until slaves.length, slaves.length).map(i => {
+ val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
val connManager = SparkEnv.get.connectionManager
val thisConnManagerId = connManager.id
connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
@@ -46,7 +55,6 @@ private[spark] object ConnectionManagerTest extends Logging{
None
})
- val size = 100 * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
@@ -56,13 +64,13 @@ private[spark] object ConnectionManagerTest extends Logging{
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
})
- val results = futures.map(f => Await.result(f, 1.second))
+ val results = futures.map(f => Await.result(f, awaitTime))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)
val mb = size * results.size / 1024.0 / 1024.0
val ms = finishTime - startTime
- val resultStr = "Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
+ val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
logInfo(resultStr)
resultStr
}).collect()
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 759bea5e9d..1d528be2aa 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -1,9 +1,9 @@
package spark.rdd
import java.io.{ObjectOutputStream, IOException}
-
+import java.util.{HashMap => JHashMap}
+import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Split, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -86,9 +86,16 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
val split = s.asInstanceOf[CoGroupSplit]
val numRdds = split.deps.size
- val map = new HashMap[K, Seq[ArrayBuffer[Any]]]
+ val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
- map.getOrElseUpdate(k, Array.fill(numRdds)(new ArrayBuffer[Any]))
+ val seq = map.get(k)
+ if (seq != null) {
+ seq
+ } else {
+ val seq = Array.fill(numRdds)(new ArrayBuffer[Any])
+ map.put(k, seq)
+ seq
+ }
}
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => {
@@ -108,7 +115,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair)
}
}
- map.iterator
+ JavaConversions.mapAsScalaMap(map).iterator
}
override def clearDependencies() {
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
index b80e9bc07b..6dbe235bd9 100644
--- a/core/src/main/scala/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala
@@ -9,6 +9,8 @@ private[spark] class FilteredRDD[T: ClassManifest](
override def getSplits = firstParent[T].splits
+ override val partitioner = prev.partitioner // Since filter cannot change a partition's keys
+
override def compute(split: Split, context: TaskContext) =
firstParent[T].iterator(split, context).filter(f)
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 74a63c1af1..8cd4c661eb 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -72,9 +72,11 @@ private[spark] class ResultTask[T, U](
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId)
- val result = func(context, rdd.iterator(split, context))
- context.executeOnCompleteCallbacks()
- result
+ try {
+ func(context, rdd.iterator(split, context))
+ } finally {
+ context.executeOnCompleteCallbacks()
+ }
}
override def preferredLocations: Seq[String] = locs
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index cf4aae03a7..a089b71644 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -201,7 +201,11 @@ private[spark] class TaskSetManager(
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
val preferred = isPreferredLocation(task, host)
- val prefStr = if (preferred) "preferred" else "non-preferred"
+ val prefStr = if (preferred) {
+ "preferred"
+ } else {
+ "non-preferred, not one of " + task.preferredLocations.mkString(", ")
+ }
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
taskSet.id, index, taskId, slaveId, host, prefStr))
// Do various bookkeeping
diff --git a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala
deleted file mode 100644
index 37cafd1e8e..0000000000
--- a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.PrivateMethodTester
-import org.scalatest.matchers.ShouldMatchers
-
-// TODO: Replace this with a test of MemoryStore
-class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester with ShouldMatchers {
- test("constructor test") {
- val cache = new BoundedMemoryCache(60)
- expect(60)(cache.getCapacity)
- }
-
- test("caching") {
- // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
- val oldArch = System.setProperty("os.arch", "amd64")
- val oldOops = System.setProperty("spark.test.useCompressedOops", "true")
- val initialize = PrivateMethod[Unit]('initialize)
- SizeEstimator invokePrivate initialize()
-
- val cache = new BoundedMemoryCache(60) {
- //TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry'
- override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
- logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
- }
- }
-
- // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length
- // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
- // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
- // Work around to check for either.
-
- //should be OK
- cache.put("1", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48)))
-
- //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
- //cache because it's from the same dataset
- expect(CachePutFailure())(cache.put("1", 1, "Meh"))
-
- //should be OK, dataset '1' can be evicted from cache
- cache.put("2", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48)))
-
- //should fail, cache should obey it's capacity
- expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string"))
-
- if (oldArch != null) {
- System.setProperty("os.arch", oldArch)
- } else {
- System.clearProperty("os.arch")
- }
-
- if (oldOops != null) {
- System.setProperty("spark.test.useCompressedOops", oldOops)
- } else {
- System.clearProperty("spark.test.useCompressedOops")
- }
- }
-}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 0b5354774b..01351de4ae 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -624,6 +624,22 @@ public class JavaAPISuite implements Serializable {
}
});
Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+ // Test the setValue method
+ floatAccum.setValue(5.0f);
+ Assert.assertEquals((Float) 5.0f, floatAccum.value());
+ }
+
+ @Test
+ public void keyBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+ List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
+ public String call(Integer t) throws Exception {
+ return t.toString();
+ }
+ }).collect();
+ Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
}
@Test
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 5b4b198960..d3dd3a8fa4 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -1,12 +1,18 @@
package spark
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import akka.actor._
import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
+import spark.util.AkkaUtils
-class MapOutputTrackerSuite extends FunSuite {
+class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
+ after {
+ System.clearProperty("spark.master.port")
+ }
+
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1)
@@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite {
// The remaining reduce task might try to grab the output dispite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
- intercept[Exception] { tracker.getServerStatuses(10, 1) }
+ intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
+ }
+
+ test("remote fetch") {
+ System.clearProperty("spark.master.host")
+ val (actorSystem, boundPort) =
+ AkkaUtils.createActorSystem("test", "localhost", 0)
+ System.setProperty("spark.master.port", boundPort.toString)
+ val masterTracker = new MapOutputTracker(actorSystem, true)
+ val slaveTracker = new MapOutputTracker(actorSystem, false)
+ masterTracker.registerShuffle(10, 1)
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ masterTracker.registerMapOutput(10, 0, new MapStatus(
+ new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+ Seq((new BlockManagerId("hostA", 1000), size1000)))
+
+ masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+ // failure should be cached
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
}
}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index f09b602a7b..eb3c8f238f 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -106,6 +106,11 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter {
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
+
+ assert(grouped2.map(_ => 1).partitioner === None)
+ assert(grouped2.mapValues(_ => 1).partitioner === grouped2.partitioner)
+ assert(grouped2.flatMapValues(_ => Seq(1)).partitioner === grouped2.partitioner)
+ assert(grouped2.filter(_._1 > 4).partitioner === grouped2.partitioner)
}
test("partitioning Java arrays should fail") {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index e5a59dc7d6..db217f8482 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -35,6 +35,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(nums.flatMap(x => 1 to x).collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
+ assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
+ assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
assert(partitionSums.collect().toList === List(3, 7))
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 8170100f1d..bebb8ebe86 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -216,6 +216,13 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
+
+ test("keys and values") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
+ assert(rdd.keys.collect().toList === List(1, 2))
+ assert(rdd.values.collect().toList === List("a", "b"))
+ }
}
object ShuffleSuite {
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala
index 17f366212b..e235ef2f67 100644
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala
@@ -3,7 +3,6 @@ package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
-import org.scalatest.matchers.ShouldMatchers
class DummyClass1 {}
@@ -20,8 +19,17 @@ class DummyClass4(val d: DummyClass3) {
val x: Int = 0
}
+object DummyString {
+ def apply(str: String) : DummyString = new DummyString(str.toArray)
+}
+class DummyString(val arr: Array[Char]) {
+ override val hashCode: Int = 0
+ // JDK-7 has an extra hash32 field http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/rev/11987e85555f
+ @transient val hash32: Int = 0
+}
+
class SizeEstimatorSuite
- extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers {
+ extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
var oldArch: String = _
var oldOops: String = _
@@ -45,15 +53,13 @@ class SizeEstimatorSuite
expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}
- // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length.
- // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
- // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
- // Work around to check for either.
+ // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
+ // (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("strings") {
- SizeEstimator.estimate("") should (equal (48) or equal (40))
- SizeEstimator.estimate("a") should (equal (56) or equal (48))
- SizeEstimator.estimate("ab") should (equal (56) or equal (48))
- SizeEstimator.estimate("abcdefgh") should (equal(64) or equal(56))
+ expect(40)(SizeEstimator.estimate(DummyString("")))
+ expect(48)(SizeEstimator.estimate(DummyString("a")))
+ expect(48)(SizeEstimator.estimate(DummyString("ab")))
+ expect(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
}
test("primitive arrays") {
@@ -105,18 +111,16 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- expect(40)(SizeEstimator.estimate(""))
- expect(48)(SizeEstimator.estimate("a"))
- expect(48)(SizeEstimator.estimate("ab"))
- expect(56)(SizeEstimator.estimate("abcdefgh"))
+ expect(40)(SizeEstimator.estimate(DummyString("")))
+ expect(48)(SizeEstimator.estimate(DummyString("a")))
+ expect(48)(SizeEstimator.estimate(DummyString("ab")))
+ expect(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
}
- // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length.
- // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
- // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
- // Work around to check for either.
+ // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
+ // (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
@@ -124,10 +128,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- SizeEstimator.estimate("") should (equal (64) or equal (56))
- SizeEstimator.estimate("a") should (equal (72) or equal (64))
- SizeEstimator.estimate("ab") should (equal (72) or equal (64))
- SizeEstimator.estimate("abcdefgh") should (equal (80) or equal (72))
+ expect(56)(SizeEstimator.estimate(DummyString("")))
+ expect(64)(SizeEstimator.estimate(DummyString("a")))
+ expect(64)(SizeEstimator.estimate(DummyString("ab")))
+ expect(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
new file mode 100644
index 0000000000..ba6f8b588f
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
@@ -0,0 +1,42 @@
+package spark.scheduler
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import spark.TaskContext
+import spark.RDD
+import spark.SparkContext
+import spark.Split
+
+class TaskContextSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after {
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+
+ test("Calls executeOnCompleteCallbacks after failure") {
+ var completed = false
+ sc = new SparkContext("local", "test")
+ val rdd = new RDD[String](sc, List()) {
+ override def getSplits = Array[Split](StubSplit(0))
+ override def compute(split: Split, context: TaskContext) = {
+ context.addOnCompleteCallback(() => completed = true)
+ sys.error("failed")
+ }
+ }
+ val func = (c: TaskContext, i: Iterator[String]) => i.next
+ val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
+ intercept[RuntimeException] {
+ task.run(0)
+ }
+ assert(completed === true)
+ }
+
+ case class StubSplit(val index: Int) extends Split
+} \ No newline at end of file