diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-20 03:54:46 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-20 03:54:46 -0800 |
commit | 76ff962edcb7f41601c6c2d4fc6714bbc885faa7 (patch) | |
tree | 87695f0e35c893ab2c9f3cc2c1e369beadc9c623 /core | |
parent | 11bbe231408a3223d110c89519a70184d58408af (diff) | |
parent | 33bad85bb9143d41bc5de2068f7e8a8c39928225 (diff) | |
download | spark-76ff962edcb7f41601c6c2d4fc6714bbc885faa7.tar.gz spark-76ff962edcb7f41601c6c2d4fc6714bbc885faa7.tar.bz2 spark-76ff962edcb7f41601c6c2d4fc6714bbc885faa7.zip |
Merge pull request #380 from tdas/streaming
Merging pySpark to streaming
Diffstat (limited to 'core')
35 files changed, 773 insertions, 285 deletions
diff --git a/core/pom.xml b/core/pom.xml index ae52c20657..862d3ec37a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -72,6 +72,10 @@ <artifactId>spray-server</artifactId> </dependency> <dependency> + <groupId>cc.spray</groupId> + <artifactId>spray-json_${scala.version}</artifactId> + </dependency> + <dependency> <groupId>org.tomdz.twirl</groupId> <artifactId>twirl-api</artifactId> </dependency> @@ -159,6 +163,11 @@ <profiles> <profile> <id>hadoop1</id> + <activation> + <property> + <name>!hadoopVersion</name> + </property> + </activation> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> @@ -211,6 +220,12 @@ </profile> <profile> <id>hadoop2</id> + <activation> + <property> + <name>hadoopVersion</name> + <value>2</value> + </property> + </activation> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> @@ -267,4 +282,4 @@ </build> </profile> </profiles> -</project>
\ No newline at end of file +</project> diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index 6280f25391..b644aba5f8 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -63,9 +63,12 @@ class Accumulable[R, T] ( /** * Access the accumulator's current value; only allowed on master. */ - def value = { - if (!deserialized) value_ - else throw new UnsupportedOperationException("Can't read accumulator value in task") + def value: R = { + if (!deserialized) { + value_ + } else { + throw new UnsupportedOperationException("Can't read accumulator value in task") + } } /** @@ -82,10 +85,17 @@ class Accumulable[R, T] ( /** * Set the accumulator's value; only allowed on master. */ - def value_= (r: R) { - if (!deserialized) value_ = r + def value_= (newValue: R) { + if (!deserialized) value_ = newValue else throw new UnsupportedOperationException("Can't assign accumulator value in task") } + + /** + * Set the accumulator's value; only allowed on master + */ + def setValue(newValue: R) { + this.value = newValue + } // Called by Java when deserializing an object private def readObject(in: ObjectInputStream) { diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala deleted file mode 100644 index e8392a194f..0000000000 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ /dev/null @@ -1,118 +0,0 @@ -package spark - -import java.util.LinkedHashMap - -/** - * An implementation of Cache that estimates the sizes of its entries and attempts to limit its - * total memory usage to a fraction of the JVM heap. Objects' sizes are estimated using - * SizeEstimator, which has limitations; most notably, we will overestimate total memory used if - * some cache entries have pointers to a shared object. Nonetheless, this Cache should work well - * when most of the space is used by arrays of primitives or of simple classes. - */ -private[spark] class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging { - logInfo("BoundedMemoryCache.maxBytes = " + maxBytes) - - def this() { - this(BoundedMemoryCache.getMaxBytes) - } - - private var currentBytes = 0L - private val map = new LinkedHashMap[(Any, Int), Entry](32, 0.75f, true) - - override def get(datasetId: Any, partition: Int): Any = { - synchronized { - val entry = map.get((datasetId, partition)) - if (entry != null) { - entry.value - } else { - null - } - } - } - - override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = { - val key = (datasetId, partition) - logInfo("Asked to add key " + key) - val size = estimateValueSize(key, value) - synchronized { - if (size > getCapacity) { - return CachePutFailure() - } else if (ensureFreeSpace(datasetId, size)) { - logInfo("Adding key " + key) - map.put(key, new Entry(value, size)) - currentBytes += size - logInfo("Number of entries is now " + map.size) - return CachePutSuccess(size) - } else { - logInfo("Didn't add key " + key + " because we would have evicted part of same dataset") - return CachePutFailure() - } - } - } - - override def getCapacity: Long = maxBytes - - /** - * Estimate sizeOf 'value' - */ - private def estimateValueSize(key: (Any, Int), value: Any) = { - val startTime = System.currentTimeMillis - val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef]) - val timeTaken = System.currentTimeMillis - startTime - logInfo("Estimated size for key %s is %d".format(key, size)) - logInfo("Size estimation for key %s took %d ms".format(key, timeTaken)) - size - } - - /** - * Remove least recently used entries from the map until at least space bytes are free, in order - * to make space for a partition from the given dataset ID. If this cannot be done without - * evicting other data from the same dataset, returns false; otherwise, returns true. Assumes - * that a lock is held on the BoundedMemoryCache. - */ - private def ensureFreeSpace(datasetId: Any, space: Long): Boolean = { - logInfo("ensureFreeSpace(%s, %d) called with curBytes=%d, maxBytes=%d".format( - datasetId, space, currentBytes, maxBytes)) - val iter = map.entrySet.iterator // Will give entries in LRU order - while (maxBytes - currentBytes < space && iter.hasNext) { - val mapEntry = iter.next() - val (entryDatasetId, entryPartition) = mapEntry.getKey - if (entryDatasetId == datasetId) { - // Cannot make space without removing part of the same dataset, or a more recently used one - return false - } - reportEntryDropped(entryDatasetId, entryPartition, mapEntry.getValue) - currentBytes -= mapEntry.getValue.size - iter.remove() - } - return true - } - - protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) { - logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) - // TODO: remove BoundedMemoryCache - - val (keySpaceId, innerDatasetId) = datasetId.asInstanceOf[(Any, Any)] - innerDatasetId match { - case rddId: Int => - SparkEnv.get.cacheTracker.dropEntry(rddId, partition) - case broadcastUUID: java.util.UUID => - // TODO: Maybe something should be done if the broadcasted variable falls out of cache - case _ => - } - } -} - -// An entry in our map; stores a cached object and its size in bytes -private[spark] case class Entry(value: Any, size: Long) - -private[spark] object BoundedMemoryCache { - /** - * Get maximum cache capacity from system configuration - */ - def getMaxBytes: Long = { - val memoryFractionToUse = System.getProperty("spark.boundedMemoryCache.memoryFraction", "0.66").toDouble - (Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong - } -} - diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index a2fa2d1ea7..ac02f3363a 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -142,8 +142,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea case e: InterruptedException => } } - return mapStatuses(shuffleId).map(status => - (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId)))) + return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, mapStatuses(shuffleId)) } else { fetching += shuffleId } @@ -159,21 +158,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea fetchedStatuses = deserializeStatuses(fetchedBytes) logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) - if (fetchedStatuses.contains(null)) { - throw new FetchFailedException(null, shuffleId, -1, reduceId, - new Exception("Missing an output location for shuffle " + shuffleId)) - } } finally { fetching.synchronized { fetching -= shuffleId fetching.notifyAll() } } - return fetchedStatuses.map(s => - (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) + return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) } else { - return statuses.map(s => - (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) + return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses) } } @@ -267,6 +260,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea private[spark] object MapOutputTracker { private val LOG_BASE = 1.1 + // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If + // any of the statuses is null (indicating a missing location due to a failed mapper), + // throw a FetchFailedException. + def convertMapStatuses( + shuffleId: Int, + reduceId: Int, + statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { + if (statuses == null) { + throw new FetchFailedException(null, shuffleId, -1, reduceId, + new Exception("Missing all output locations for shuffle " + shuffleId)) + } + statuses.map { + status => + if (status == null) { + throw new FetchFailedException(null, shuffleId, -1, reduceId, + new Exception("Missing an output location for shuffle " + shuffleId)) + } else { + (status.address, decompressSize(status.compressedSizes(reduceId))) + } + } + } + /** * Compress a size in bytes to 8 bits for efficient reporting of map output sizes. * We do this by encoding the log base 1.1 of the size as an integer, which can support diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index d95b66ad78..abb01c387c 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -615,6 +615,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( writer.cleanup() } + /** + * Return an RDD with the keys of each tuple. + */ + def keys: RDD[K] = self.map(_._1) + + /** + * Return an RDD with the values of each tuple. + */ + def values: RDD[V] = self.map(_._2) + private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 0de6f04d50..e0d2eabb1d 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -349,6 +349,13 @@ abstract class RDD[T: ClassManifest]( def toArray(): Array[T] = collect() /** + * Return an RDD that contains all matching values by applying `f`. + */ + def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = { + filter(f.isDefinedAt).map(f) + } + + /** * Reduces the elements of this RDD using the specified associative binary operator. */ def reduce(f: (T, T) => T): T = { @@ -529,23 +536,29 @@ abstract class RDD[T: ClassManifest]( .saveAsSequenceFile(path) } + /** + * Creates tuples of the elements in this RDD by applying `f`. + */ + def keyBy[K](f: T => K): RDD[(K, T)] = { + map(x => (f(x), x)) + } + /** A private method for tests, to look at the contents of each partition */ private[spark] def collectPartitions(): Array[Array[T]] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } /** - * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir` - * (set using setCheckpointDir()) and all references to its parent RDDs will be removed. - * This is used to truncate very long lineages. In the current implementation, Spark will save - * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done. - * Hence, it is strongly recommended to use checkpoint() on RDDs when - * (i) checkpoint() is called before the any job has been executed on this RDD. - * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will - * require recomputation. + * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint + * directory set with SparkContext.setCheckpointDir() and all references to its parent + * RDDs will be removed. This function must be called before any job has been + * executed on this RDD. It is strongly recommended that this RDD is persisted in + * memory, otherwise saving it on a file will require recomputation. */ def checkpoint() { - if (checkpointData.isEmpty) { + if (context.checkpointDir.isEmpty) { + throw new Exception("Checkpoint directory has not been set in the SparkContext") + } else if (checkpointData.isEmpty) { checkpointData = Some(new RDDCheckpointData(this)) checkpointData.get.markForCheckpoint() } diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala index d845a522e4..18df530b7d 100644 --- a/core/src/main/scala/spark/RDDCheckpointData.scala +++ b/core/src/main/scala/spark/RDDCheckpointData.scala @@ -63,7 +63,7 @@ extends Logging with Serializable { } // Save to file, and reload it as an RDD - val path = new Path(rdd.context.checkpointDir, "rdd-" + rdd.id).toString + val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _) val newRDD = new CheckpointRDD[T](rdd.context, path) diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index a34aee69c1..6b4a11d6d3 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -42,7 +42,13 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { classManifest[T].erasure } else { - implicitly[T => Writable].getClass.getMethods()(0).getReturnType + // We get the type of the Writable class by looking at the apply method which converts + // from T to Writable. Since we have two apply methods we filter out the one which + // is of the form "java.lang.Object apply(java.lang.Object)" + implicitly[T => Writable].getClass.getDeclaredMethods().filter( + m => m.getReturnType().toString != "java.lang.Object" && + m.getName() == "apply")(0).getReturnType + } // TODO: use something like WritableConverter to avoid reflection } diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala index 7c3e8640e9..d4e1157250 100644 --- a/core/src/main/scala/spark/SizeEstimator.scala +++ b/core/src/main/scala/spark/SizeEstimator.scala @@ -9,7 +9,6 @@ import java.util.Random import javax.management.MBeanServer import java.lang.management.ManagementFactory -import com.sun.management.HotSpotDiagnosticMXBean import scala.collection.mutable.ArrayBuffer @@ -76,12 +75,20 @@ private[spark] object SizeEstimator extends Logging { if (System.getProperty("spark.test.useCompressedOops") != null) { return System.getProperty("spark.test.useCompressedOops").toBoolean } + try { val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic" val server = ManagementFactory.getPlatformMBeanServer() + + // NOTE: This should throw an exception in non-Sun JVMs + val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean") + val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption", + Class.forName("java.lang.String")) + val bean = ManagementFactory.newPlatformMXBeanProxy(server, - hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]) - return bean.getVMOption("UseCompressedOops").getValue.toBoolean + hotSpotMBeanName, hotSpotMBeanClass) + // TODO: We could use reflection on the VMOption returned ? + return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true") } catch { case e: Exception => { // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 88cf357ebf..7f3259d982 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -184,7 +184,7 @@ class SparkContext( private var dagScheduler = new DAGScheduler(taskScheduler) - private[spark] var checkpointDir: String = null + private[spark] var checkpointDir: Option[String] = None // Methods for creating RDDs @@ -595,10 +595,11 @@ class SparkContext( } /** - * Set the directory under which RDDs are going to be checkpointed. This method will - * create this directory and will throw an exception of the path already exists (to avoid - * overwriting existing files may be overwritten). The directory will be deleted on exit - * if indicated. + * Set the directory under which RDDs are going to be checkpointed. The directory must + * be a HDFS path if running on a cluster. If the directory does not exist, it will + * be created. If the directory exists and useExisting is set to true, then the + * exisiting directory will be used. Otherwise an exception will be thrown to + * prevent accidental overriding of checkpoint files in the existing directory. */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { val path = new Path(dir) @@ -610,7 +611,7 @@ class SparkContext( fs.mkdirs(path) } } - checkpointDir = dir + checkpointDir = Some(dir) } /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */ diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 5c2be534ff..8ce32e0e2f 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -471,6 +471,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x) fromRDD(new OrderedRDDFunctions(rdd).sortByKey(ascending)) } + + /** + * Return an RDD with the keys of each tuple. + */ + def keys(): JavaRDD[K] = JavaRDD.fromRDD[K](rdd.map(_._1)) + + /** + * Return an RDD with the values of each tuple. + */ + def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2)) } object JavaPairRDD { diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 958f5c26a1..087270e46d 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -301,6 +301,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path) /** + * Creates tuples of the elements in this RDD by applying `f`. + */ + def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = { + implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + JavaPairRDD.fromRDD(rdd.keyBy(f)) + } + + /** * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir` * (set using setCheckpointDir()) and all references to its parent RDDs will be removed. * This is used to truncate very long lineages. In the current implementation, Spark will save diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 22bfa2280d..fa2f14113d 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -278,6 +278,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]] /** + * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values + * to using the `add` method. Only the master can access the accumulator's `value`. + */ + def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue) + + /** + * Create an [[spark.Accumulator]] double variable, which tasks can "add" values + * to using the `add` method. Only the master can access the accumulator's `value`. + */ + def accumulator(initialValue: Double): Accumulator[java.lang.Double] = + doubleAccumulator(initialValue) + + /** * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala new file mode 100644 index 0000000000..648d9402b0 --- /dev/null +++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala @@ -0,0 +1,39 @@ +package spark.api.python + +import spark.Partitioner + +import java.util.Arrays + +/** + * A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API. + */ +private[spark] class PythonPartitioner(override val numPartitions: Int) extends Partitioner { + + override def getPartition(key: Any): Int = { + if (key == null) { + return 0 + } + else { + val hashCode = { + if (key.isInstanceOf[Array[Byte]]) { + Arrays.hashCode(key.asInstanceOf[Array[Byte]]) + } else { + key.hashCode() + } + } + val mod = hashCode % numPartitions + if (mod < 0) { + mod + numPartitions + } else { + mod // Guard against negative hash codes + } + } + } + + override def equals(other: Any): Boolean = other match { + case h: PythonPartitioner => + h.numPartitions == numPartitions + case _ => + false + } +} diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala new file mode 100644 index 0000000000..0138b22d38 --- /dev/null +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -0,0 +1,248 @@ +package spark.api.python + +import java.io._ +import java.util.{List => JList} + +import scala.collection.JavaConversions._ +import scala.io.Source + +import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import spark.broadcast.Broadcast +import spark._ +import spark.rdd.PipedRDD +import java.util + + +private[spark] class PythonRDD[T: ClassManifest]( + parent: RDD[T], + command: Seq[String], + envVars: java.util.Map[String, String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: java.util.List[Broadcast[Array[Byte]]]) + extends RDD[Array[Byte]](parent) { + + // Similar to Runtime.exec(), if we are given a single string, split it into words + // using a standard StringTokenizer (i.e. by spaces) + def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String], + preservePartitoning: Boolean, pythonExec: String, + broadcastVars: java.util.List[Broadcast[Array[Byte]]]) = + this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, + broadcastVars) + + override def getSplits = parent.splits + + override val partitioner = if (preservePartitoning) parent.partitioner else None + + override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = { + val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME") + + val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py")) + // Add the environmental variables to the process. + val currentEnvVars = pb.environment() + + for ((variable, value) <- envVars) { + currentEnvVars.put(variable, value) + } + + val proc = pb.start() + val env = SparkEnv.get + + // Start a thread to print the process's stderr to ours + new Thread("stderr reader for " + command) { + override def run() { + for (line <- Source.fromInputStream(proc.getErrorStream).getLines) { + System.err.println(line) + } + } + }.start() + + // Start a thread to feed the process input from our parent's iterator + new Thread("stdin writer for " + command) { + override def run() { + SparkEnv.set(env) + val out = new PrintWriter(proc.getOutputStream) + val dOut = new DataOutputStream(proc.getOutputStream) + // Split index + dOut.writeInt(split.index) + // Broadcast variables + dOut.writeInt(broadcastVars.length) + for (broadcast <- broadcastVars) { + dOut.writeLong(broadcast.id) + dOut.writeInt(broadcast.value.length) + dOut.write(broadcast.value) + dOut.flush() + } + // Serialized user code + for (elem <- command) { + out.println(elem) + } + out.flush() + // Data values + for (elem <- parent.iterator(split, context)) { + PythonRDD.writeAsPickle(elem, dOut) + } + dOut.flush() + out.flush() + proc.getOutputStream.close() + } + }.start() + + // Return an iterator that read lines from the process's stdout + val stream = new DataInputStream(proc.getInputStream) + return new Iterator[Array[Byte]] { + def next() = { + val obj = _nextObj + _nextObj = read() + obj + } + + private def read() = { + try { + val length = stream.readInt() + val obj = new Array[Byte](length) + stream.readFully(obj) + obj + } catch { + case eof: EOFException => { + val exitStatus = proc.waitFor() + if (exitStatus != 0) { + throw new Exception("Subprocess exited with status " + exitStatus) + } + new Array[Byte](0) + } + case e => throw e + } + } + + var _nextObj = read() + + def hasNext = _nextObj.length != 0 + } + } + + override def checkpoint() { } + + val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) +} + +/** + * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. + * This is used by PySpark's shuffle operations. + */ +private class PairwiseRDD(prev: RDD[Array[Byte]]) extends + RDD[(Array[Byte], Array[Byte])](prev) { + override def getSplits = prev.splits + override def compute(split: Split, context: TaskContext) = + prev.iterator(split, context).grouped(2).map { + case Seq(a, b) => (a, b) + case x => throw new Exception("PairwiseRDD: unexpected value: " + x) + } + override def checkpoint() { } + val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) +} + +private[spark] object PythonRDD { + + /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */ + def stripPickle(arr: Array[Byte]) : Array[Byte] = { + arr.slice(2, arr.length - 1) + } + + /** + * Write strings, pickled Python objects, or pairs of pickled objects to a data output stream. + * The data format is a 32-bit integer representing the pickled object's length (in bytes), + * followed by the pickled data. + * + * Pickle module: + * + * http://docs.python.org/2/library/pickle.html + * + * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules: + * + * http://hg.python.org/cpython/file/2.6/Lib/pickle.py + * http://hg.python.org/cpython/file/2.6/Lib/pickletools.py + * + * @param elem the object to write + * @param dOut a data output stream + */ + def writeAsPickle(elem: Any, dOut: DataOutputStream) { + if (elem.isInstanceOf[Array[Byte]]) { + val arr = elem.asInstanceOf[Array[Byte]] + dOut.writeInt(arr.length) + dOut.write(arr) + } else if (elem.isInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]) { + val t = elem.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]] + val length = t._1.length + t._2.length - 3 - 3 + 4 // stripPickle() removes 3 bytes + dOut.writeInt(length) + dOut.writeByte(Pickle.PROTO) + dOut.writeByte(Pickle.TWO) + dOut.write(PythonRDD.stripPickle(t._1)) + dOut.write(PythonRDD.stripPickle(t._2)) + dOut.writeByte(Pickle.TUPLE2) + dOut.writeByte(Pickle.STOP) + } else if (elem.isInstanceOf[String]) { + // For uniformity, strings are wrapped into Pickles. + val s = elem.asInstanceOf[String].getBytes("UTF-8") + val length = 2 + 1 + 4 + s.length + 1 + dOut.writeInt(length) + dOut.writeByte(Pickle.PROTO) + dOut.writeByte(Pickle.TWO) + dOut.write(Pickle.BINUNICODE) + dOut.writeInt(Integer.reverseBytes(s.length)) + dOut.write(s) + dOut.writeByte(Pickle.STOP) + } else { + throw new Exception("Unexpected RDD type") + } + } + + def readRDDFromPickleFile(sc: JavaSparkContext, filename: String, parallelism: Int) : + JavaRDD[Array[Byte]] = { + val file = new DataInputStream(new FileInputStream(filename)) + val objs = new collection.mutable.ArrayBuffer[Array[Byte]] + try { + while (true) { + val length = file.readInt() + val obj = new Array[Byte](length) + file.readFully(obj) + objs.append(obj) + } + } catch { + case eof: EOFException => {} + case e => throw e + } + JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) + } + + def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) { + val file = new DataOutputStream(new FileOutputStream(filename)) + for (item <- items) { + writeAsPickle(item, file) + } + file.close() + } + + def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] = + rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head +} + +private object Pickle { + val PROTO: Byte = 0x80.toByte + val TWO: Byte = 0x02.toByte + val BINUNICODE: Byte = 'X' + val STOP: Byte = '.' + val TUPLE2: Byte = 0x86.toByte + val EMPTY_LIST: Byte = ']' + val MARK: Byte = '(' + val APPENDS: Byte = 'e' +} + +private class ExtractValue extends spark.api.java.function.Function[(Array[Byte], + Array[Byte]), Array[Byte]] { + override def call(pair: (Array[Byte], Array[Byte])) : Array[Byte] = pair._2 +} + +private class BytesToString extends spark.api.java.function.Function[Array[Byte], String] { + override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8") +} diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 6055bfd045..2ffe7f741d 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicLong import spark._ -abstract class Broadcast[T](id: Long) extends Serializable { +abstract class Broadcast[T](private[spark] val id: Long) extends Serializable { def value: T // We cannot have an abstract readObject here due to some weird issues with diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala new file mode 100644 index 0000000000..732fa08064 --- /dev/null +++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala @@ -0,0 +1,78 @@ +package spark.deploy + +import master.{JobInfo, WorkerInfo} +import worker.ExecutorRunner +import cc.spray.json._ + +/** + * spray-json helper class containing implicit conversion to json for marshalling responses + */ +private[spark] object JsonProtocol extends DefaultJsonProtocol { + implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] { + def write(obj: WorkerInfo) = JsObject( + "id" -> JsString(obj.id), + "host" -> JsString(obj.host), + "webuiaddress" -> JsString(obj.webUiAddress), + "cores" -> JsNumber(obj.cores), + "coresused" -> JsNumber(obj.coresUsed), + "memory" -> JsNumber(obj.memory), + "memoryused" -> JsNumber(obj.memoryUsed) + ) + } + + implicit object JobInfoJsonFormat extends RootJsonWriter[JobInfo] { + def write(obj: JobInfo) = JsObject( + "starttime" -> JsNumber(obj.startTime), + "id" -> JsString(obj.id), + "name" -> JsString(obj.desc.name), + "cores" -> JsNumber(obj.desc.cores), + "user" -> JsString(obj.desc.user), + "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave), + "submitdate" -> JsString(obj.submitDate.toString)) + } + + implicit object JobDescriptionJsonFormat extends RootJsonWriter[JobDescription] { + def write(obj: JobDescription) = JsObject( + "name" -> JsString(obj.name), + "cores" -> JsNumber(obj.cores), + "memoryperslave" -> JsNumber(obj.memoryPerSlave), + "user" -> JsString(obj.user) + ) + } + + implicit object ExecutorRunnerJsonFormat extends RootJsonWriter[ExecutorRunner] { + def write(obj: ExecutorRunner) = JsObject( + "id" -> JsNumber(obj.execId), + "memory" -> JsNumber(obj.memory), + "jobid" -> JsString(obj.jobId), + "jobdesc" -> obj.jobDesc.toJson.asJsObject + ) + } + + implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] { + def write(obj: MasterState) = JsObject( + "url" -> JsString("spark://" + obj.uri), + "workers" -> JsArray(obj.workers.toList.map(_.toJson)), + "cores" -> JsNumber(obj.workers.map(_.cores).sum), + "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum), + "memory" -> JsNumber(obj.workers.map(_.memory).sum), + "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum), + "activejobs" -> JsArray(obj.activeJobs.toList.map(_.toJson)), + "completedjobs" -> JsArray(obj.completedJobs.toList.map(_.toJson)) + ) + } + + implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] { + def write(obj: WorkerState) = JsObject( + "id" -> JsString(obj.workerId), + "masterurl" -> JsString(obj.masterUrl), + "masterwebuiurl" -> JsString(obj.masterWebUiUrl), + "cores" -> JsNumber(obj.cores), + "coresused" -> JsNumber(obj.coresUsed), + "memory" -> JsNumber(obj.memory), + "memoryused" -> JsNumber(obj.memoryUsed), + "executors" -> JsArray(obj.executors.toList.map(_.toJson)), + "finishedexecutors" -> JsArray(obj.finishedExecutors.toList.map(_.toJson)) + ) + } +} diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 3cdd3721f5..458ee2d665 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -8,7 +8,11 @@ import akka.util.duration._ import cc.spray.Directives import cc.spray.directives._ import cc.spray.typeconversion.TwirlSupport._ +import cc.spray.http.MediaTypes +import cc.spray.typeconversion.SprayJsonSupport._ + import spark.deploy._ +import spark.deploy.JsonProtocol._ private[spark] class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { @@ -19,29 +23,51 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct val handler = { get { - path("") { - completeWith { + (path("") & parameters('format ?)) { + case Some(js) if js.equalsIgnoreCase("json") => val future = master ? RequestMasterState - future.map { - masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState]) + respondWithMediaType(MediaTypes.`application/json`) { ctx => + ctx.complete(future.mapTo[MasterState]) + } + case _ => + completeWith { + val future = master ? RequestMasterState + future.map { + masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState]) + } } - } } ~ path("job") { - parameter("jobId") { jobId => - completeWith { + parameters("jobId", 'format ?) { + case (jobId, Some(js)) if (js.equalsIgnoreCase("json")) => val future = master ? RequestMasterState - future.map { state => - val masterState = state.asInstanceOf[MasterState] - - // A bit ugly an inefficient, but we won't have a number of jobs - // so large that it will make a significant difference. - (masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match { - case Some(job) => spark.deploy.master.html.job_details.render(job) - case _ => null + val jobInfo = for (masterState <- future.mapTo[MasterState]) yield { + masterState.activeJobs.find(_.id == jobId) match { + case Some(job) => job + case _ => masterState.completedJobs.find(_.id == jobId) match { + case Some(job) => job + case _ => null + } + } + } + respondWithMediaType(MediaTypes.`application/json`) { ctx => + ctx.complete(jobInfo.mapTo[JobInfo]) + } + case (jobId, _) => + completeWith { + val future = master ? RequestMasterState + future.map { state => + val masterState = state.asInstanceOf[MasterState] + + masterState.activeJobs.find(_.id == jobId) match { + case Some(job) => spark.deploy.master.html.job_details.render(job) + case _ => masterState.completedJobs.find(_.id == jobId) match { + case Some(job) => spark.deploy.master.html.job_details.render(job) + case _ => null + } + } } } - } } } ~ pathPrefix("static") { diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala index 340920025b..37524a7c82 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala @@ -104,9 +104,25 @@ private[spark] class WorkerArguments(args: Array[String]) { } def inferDefaultMemory(): Int = { - val bean = ManagementFactory.getOperatingSystemMXBean - .asInstanceOf[com.sun.management.OperatingSystemMXBean] - val totalMb = (bean.getTotalPhysicalMemorySize / 1024 / 1024).toInt + val ibmVendor = System.getProperty("java.vendor").contains("IBM") + var totalMb = 0 + try { + val bean = ManagementFactory.getOperatingSystemMXBean() + if (ibmVendor) { + val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean") + val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory") + totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt + } else { + val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean") + val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize") + totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt + } + } catch { + case e: Exception => { + totalMb = 2*1024 + System.out.println("Failed to get total physical memory. Using " + totalMb + " MB") + } + } // Leave out 1 GB for the operating system, but don't return a negative memory size math.max(totalMb - 1024, 512) } diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index d06f4884ee..f9489d99fc 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -7,7 +7,11 @@ import akka.util.Timeout import akka.util.duration._ import cc.spray.Directives import cc.spray.typeconversion.TwirlSupport._ +import cc.spray.http.MediaTypes +import cc.spray.typeconversion.SprayJsonSupport._ + import spark.deploy.{WorkerState, RequestWorkerState} +import spark.deploy.JsonProtocol._ private[spark] class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { @@ -18,13 +22,20 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct val handler = { get { - path("") { - completeWith{ + (path("") & parameters('format ?)) { + case Some(js) if js.equalsIgnoreCase("json") => { val future = worker ? RequestWorkerState - future.map { workerState => - spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState]) + respondWithMediaType(MediaTypes.`application/json`) { ctx => + ctx.complete(future.mapTo[WorkerState]) } } + case _ => + completeWith{ + val future = worker ? RequestWorkerState + future.map { workerState => + spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState]) + } + } } ~ path("log") { parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) => diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index 80262ab7b4..c193bf7c8d 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -135,8 +135,11 @@ extends Connection(SocketChannel.open, selector_) { val chunk = message.getChunkForSending(defaultChunkSize) if (chunk.isDefined) { messages += message // this is probably incorrect, it wont work as fifo - if (!message.started) logDebug("Starting to send [" + message + "]") - message.started = true + if (!message.started) { + logDebug("Starting to send [" + message + "]") + message.started = true + message.startTime = System.currentTimeMillis + } return chunk } else { /*logInfo("Finished sending [" + message + "] to [" + remoteConnectionManagerId + "]")*/ diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 642fa4b525..36c01ad629 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -43,12 +43,12 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } val selector = SelectorProvider.provider.openSelector() - val handleMessageExecutor = Executors.newFixedThreadPool(4) + val handleMessageExecutor = Executors.newFixedThreadPool(System.getProperty("spark.core.connection.handler.threads","20").toInt) val serverChannel = ServerSocketChannel.open() val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection] val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection] val messageStatuses = new HashMap[Int, MessageStatus] - val connectionRequests = new SynchronizedQueue[SendingConnection] + val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection] val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] val sendMessageRequests = new Queue[(Message, SendingConnection)] @@ -79,10 +79,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging { def run() { try { while(!selectorThread.isInterrupted) { - while(!connectionRequests.isEmpty) { - val sendingConnection = connectionRequests.dequeue + for( (connectionManagerId, sendingConnection) <- connectionRequests) { sendingConnection.connect() addConnection(sendingConnection) + connectionRequests -= connectionManagerId } sendMessageRequests.synchronized { while(!sendMessageRequests.isEmpty) { @@ -300,8 +300,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) { def startNewConnection(): SendingConnection = { val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port) - val newConnection = new SendingConnection(inetSocketAddress, selector) - connectionRequests += newConnection + val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector)) newConnection } val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress) @@ -473,6 +472,7 @@ private[spark] object ConnectionManager { val mb = size * count / 1024.0 / 1024.0 val ms = finishTime - startTime val tput = mb * 1000.0 / ms + println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)") println("--------------------------") println() } diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala index 47ceaf3c07..533e4610f3 100644 --- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala @@ -13,8 +13,14 @@ import akka.util.duration._ private[spark] object ConnectionManagerTest extends Logging{ def main(args: Array[String]) { + //<mesos cluster> - the master URL + //<slaves file> - a list slaves to run connectionTest on + //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts + //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10 + //[count] - how many times to run, default is 3 + //[await time in seconds] : await time (in seconds), default is 600 if (args.length < 2) { - println("Usage: ConnectionManagerTest <mesos cluster> <slaves file>") + println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ") System.exit(1) } @@ -29,16 +35,19 @@ private[spark] object ConnectionManagerTest extends Logging{ /*println("Slaves")*/ /*slaves.foreach(println)*/ - - val slaveConnManagerIds = sc.parallelize(0 until slaves.length, slaves.length).map( + val tasknum = if (args.length > 2) args(2).toInt else slaves.length + val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 + val count = if (args.length > 4) args(4).toInt else 3 + val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second + println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime) + val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map( i => SparkEnv.get.connectionManager.id).collect() println("\nSlave ConnectionManagerIds") slaveConnManagerIds.foreach(println) println - val count = 10 (0 until count).foreach(i => { - val resultStrs = sc.parallelize(0 until slaves.length, slaves.length).map(i => { + val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => { val connManager = SparkEnv.get.connectionManager val thisConnManagerId = connManager.id connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -46,7 +55,6 @@ private[spark] object ConnectionManagerTest extends Logging{ None }) - val size = 100 * 1024 * 1024 val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte)) buffer.flip @@ -56,13 +64,13 @@ private[spark] object ConnectionManagerTest extends Logging{ logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]") connManager.sendMessageReliably(slaveConnManagerId, bufferMessage) }) - val results = futures.map(f => Await.result(f, 1.second)) + val results = futures.map(f => Await.result(f, awaitTime)) val finishTime = System.currentTimeMillis Thread.sleep(5000) val mb = size * results.size / 1024.0 / 1024.0 val ms = finishTime - startTime - val resultStr = "Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s" + val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s" logInfo(resultStr) resultStr }).collect() diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 759bea5e9d..1d528be2aa 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -1,9 +1,9 @@ package spark.rdd import java.io.{ObjectOutputStream, IOException} - +import java.util.{HashMap => JHashMap} +import scala.collection.JavaConversions import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Split, TaskContext} import spark.{Dependency, OneToOneDependency, ShuffleDependency} @@ -86,9 +86,16 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { val split = s.asInstanceOf[CoGroupSplit] val numRdds = split.deps.size - val map = new HashMap[K, Seq[ArrayBuffer[Any]]] + val map = new JHashMap[K, Seq[ArrayBuffer[Any]]] def getSeq(k: K): Seq[ArrayBuffer[Any]] = { - map.getOrElseUpdate(k, Array.fill(numRdds)(new ArrayBuffer[Any])) + val seq = map.get(k) + if (seq != null) { + seq + } else { + val seq = Array.fill(numRdds)(new ArrayBuffer[Any]) + map.put(k, seq) + seq + } } for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => { @@ -108,7 +115,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair) } } - map.iterator + JavaConversions.mapAsScalaMap(map).iterator } override def clearDependencies() { diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala index b80e9bc07b..6dbe235bd9 100644 --- a/core/src/main/scala/spark/rdd/FilteredRDD.scala +++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala @@ -9,6 +9,8 @@ private[spark] class FilteredRDD[T: ClassManifest]( override def getSplits = firstParent[T].splits + override val partitioner = prev.partitioner // Since filter cannot change a partition's keys + override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context).filter(f) } diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala index 74a63c1af1..8cd4c661eb 100644 --- a/core/src/main/scala/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/spark/scheduler/ResultTask.scala @@ -72,9 +72,11 @@ private[spark] class ResultTask[T, U]( override def run(attemptId: Long): U = { val context = new TaskContext(stageId, partition, attemptId) - val result = func(context, rdd.iterator(split, context)) - context.executeOnCompleteCallbacks() - result + try { + func(context, rdd.iterator(split, context)) + } finally { + context.executeOnCompleteCallbacks() + } } override def preferredLocations: Seq[String] = locs diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index cf4aae03a7..a089b71644 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -201,7 +201,11 @@ private[spark] class TaskSetManager( val taskId = sched.newTaskId() // Figure out whether this should count as a preferred launch val preferred = isPreferredLocation(task, host) - val prefStr = if (preferred) "preferred" else "non-preferred" + val prefStr = if (preferred) { + "preferred" + } else { + "non-preferred, not one of " + task.preferredLocations.mkString(", ") + } logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format( taskSet.id, index, taskId, slaveId, host, prefStr)) // Do various bookkeeping diff --git a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala deleted file mode 100644 index 37cafd1e8e..0000000000 --- a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala +++ /dev/null @@ -1,58 +0,0 @@ -package spark - -import org.scalatest.FunSuite -import org.scalatest.PrivateMethodTester -import org.scalatest.matchers.ShouldMatchers - -// TODO: Replace this with a test of MemoryStore -class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester with ShouldMatchers { - test("constructor test") { - val cache = new BoundedMemoryCache(60) - expect(60)(cache.getCapacity) - } - - test("caching") { - // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case - val oldArch = System.setProperty("os.arch", "amd64") - val oldOops = System.setProperty("spark.test.useCompressedOops", "true") - val initialize = PrivateMethod[Unit]('initialize) - SizeEstimator invokePrivate initialize() - - val cache = new BoundedMemoryCache(60) { - //TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry' - override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) { - logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) - } - } - - // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length - // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. - // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html - // Work around to check for either. - - //should be OK - cache.put("1", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48))) - - //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from - //cache because it's from the same dataset - expect(CachePutFailure())(cache.put("1", 1, "Meh")) - - //should be OK, dataset '1' can be evicted from cache - cache.put("2", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48))) - - //should fail, cache should obey it's capacity - expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string")) - - if (oldArch != null) { - System.setProperty("os.arch", oldArch) - } else { - System.clearProperty("os.arch") - } - - if (oldOops != null) { - System.setProperty("spark.test.useCompressedOops", oldOops) - } else { - System.clearProperty("spark.test.useCompressedOops") - } - } -} diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 0b5354774b..01351de4ae 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -624,6 +624,22 @@ public class JavaAPISuite implements Serializable { } }); Assert.assertEquals((Float) 25.0f, floatAccum.value()); + + // Test the setValue method + floatAccum.setValue(5.0f); + Assert.assertEquals((Float) 5.0f, floatAccum.value()); + } + + @Test + public void keyBy() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); + List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() { + public String call(Integer t) throws Exception { + return t.toString(); + } + }).collect(); + Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1)); } @Test diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 5b4b198960..d3dd3a8fa4 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -1,12 +1,18 @@ package spark import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter import akka.actor._ import spark.scheduler.MapStatus import spark.storage.BlockManagerId +import spark.util.AkkaUtils -class MapOutputTrackerSuite extends FunSuite { +class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter { + after { + System.clearProperty("spark.master.port") + } + test("compressSize") { assert(MapOutputTracker.compressSize(0L) === 0) assert(MapOutputTracker.compressSize(1L) === 1) @@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite { // The remaining reduce task might try to grab the output dispite the shuffle failure; // this should cause it to fail, and the scheduler will ignore the failure due to the // stage already being aborted. - intercept[Exception] { tracker.getServerStatuses(10, 1) } + intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) } + } + + test("remote fetch") { + System.clearProperty("spark.master.host") + val (actorSystem, boundPort) = + AkkaUtils.createActorSystem("test", "localhost", 0) + System.setProperty("spark.master.port", boundPort.toString) + val masterTracker = new MapOutputTracker(actorSystem, true) + val slaveTracker = new MapOutputTracker(actorSystem, false) + masterTracker.registerShuffle(10, 1) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + + val compressedSize1000 = MapOutputTracker.compressSize(1000L) + val size1000 = MapOutputTracker.decompressSize(compressedSize1000) + masterTracker.registerMapOutput(10, 0, new MapStatus( + new BlockManagerId("hostA", 1000), Array(compressedSize1000))) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + assert(slaveTracker.getServerStatuses(10, 0).toSeq === + Seq((new BlockManagerId("hostA", 1000), size1000))) + + masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000)) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + + // failure should be cached + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } } } diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index f09b602a7b..eb3c8f238f 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -106,6 +106,11 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter { assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner) + + assert(grouped2.map(_ => 1).partitioner === None) + assert(grouped2.mapValues(_ => 1).partitioner === grouped2.partitioner) + assert(grouped2.flatMapValues(_ => Seq(1)).partitioner === grouped2.partitioner) + assert(grouped2.filter(_._1 > 4).partitioner === grouped2.partitioner) } test("partitioning Java arrays should fail") { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index e5a59dc7d6..db217f8482 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -35,6 +35,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter { assert(nums.flatMap(x => 1 to x).collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)) assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) + assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4")) + assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4))) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) assert(partitionSums.collect().toList === List(3, 7)) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 8170100f1d..bebb8ebe86 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -216,6 +216,13 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // Test that a shuffle on the file works, because this used to be a bug assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) } + + test("keys and values") { + sc = new SparkContext("local", "test") + val rdd = sc.parallelize(Array((1, "a"), (2, "b"))) + assert(rdd.keys.collect().toList === List(1, 2)) + assert(rdd.values.collect().toList === List("a", "b")) + } } object ShuffleSuite { diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala index 17f366212b..e235ef2f67 100644 --- a/core/src/test/scala/spark/SizeEstimatorSuite.scala +++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala @@ -3,7 +3,6 @@ package spark import org.scalatest.FunSuite import org.scalatest.BeforeAndAfterAll import org.scalatest.PrivateMethodTester -import org.scalatest.matchers.ShouldMatchers class DummyClass1 {} @@ -20,8 +19,17 @@ class DummyClass4(val d: DummyClass3) { val x: Int = 0 } +object DummyString { + def apply(str: String) : DummyString = new DummyString(str.toArray) +} +class DummyString(val arr: Array[Char]) { + override val hashCode: Int = 0 + // JDK-7 has an extra hash32 field http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/rev/11987e85555f + @transient val hash32: Int = 0 +} + class SizeEstimatorSuite - extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers { + extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { var oldArch: String = _ var oldOops: String = _ @@ -45,15 +53,13 @@ class SizeEstimatorSuite expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) } - // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length. - // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. - // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html - // Work around to check for either. + // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors + // (Sun vs IBM). Use a DummyString class to make tests deterministic. test("strings") { - SizeEstimator.estimate("") should (equal (48) or equal (40)) - SizeEstimator.estimate("a") should (equal (56) or equal (48)) - SizeEstimator.estimate("ab") should (equal (56) or equal (48)) - SizeEstimator.estimate("abcdefgh") should (equal(64) or equal(56)) + expect(40)(SizeEstimator.estimate(DummyString(""))) + expect(48)(SizeEstimator.estimate(DummyString("a"))) + expect(48)(SizeEstimator.estimate(DummyString("ab"))) + expect(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) } test("primitive arrays") { @@ -105,18 +111,16 @@ class SizeEstimatorSuite val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - expect(40)(SizeEstimator.estimate("")) - expect(48)(SizeEstimator.estimate("a")) - expect(48)(SizeEstimator.estimate("ab")) - expect(56)(SizeEstimator.estimate("abcdefgh")) + expect(40)(SizeEstimator.estimate(DummyString(""))) + expect(48)(SizeEstimator.estimate(DummyString("a"))) + expect(48)(SizeEstimator.estimate(DummyString("ab"))) + expect(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) resetOrClear("os.arch", arch) } - // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length. - // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. - // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html - // Work around to check for either. + // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors + // (Sun vs IBM). Use a DummyString class to make tests deterministic. test("64-bit arch with no compressed oops") { val arch = System.setProperty("os.arch", "amd64") val oops = System.setProperty("spark.test.useCompressedOops", "false") @@ -124,10 +128,10 @@ class SizeEstimatorSuite val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - SizeEstimator.estimate("") should (equal (64) or equal (56)) - SizeEstimator.estimate("a") should (equal (72) or equal (64)) - SizeEstimator.estimate("ab") should (equal (72) or equal (64)) - SizeEstimator.estimate("abcdefgh") should (equal (80) or equal (72)) + expect(56)(SizeEstimator.estimate(DummyString(""))) + expect(64)(SizeEstimator.estimate(DummyString("a"))) + expect(64)(SizeEstimator.estimate(DummyString("ab"))) + expect(72)(SizeEstimator.estimate(DummyString("abcdefgh"))) resetOrClear("os.arch", arch) resetOrClear("spark.test.useCompressedOops", oops) diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala new file mode 100644 index 0000000000..ba6f8b588f --- /dev/null +++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala @@ -0,0 +1,42 @@ +package spark.scheduler + +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter +import spark.TaskContext +import spark.RDD +import spark.SparkContext +import spark.Split + +class TaskContextSuite extends FunSuite with BeforeAndAfter { + + var sc: SparkContext = _ + + after { + if (sc != null) { + sc.stop() + sc = null + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + + test("Calls executeOnCompleteCallbacks after failure") { + var completed = false + sc = new SparkContext("local", "test") + val rdd = new RDD[String](sc, List()) { + override def getSplits = Array[Split](StubSplit(0)) + override def compute(split: Split, context: TaskContext) = { + context.addOnCompleteCallback(() => completed = true) + sys.error("failed") + } + } + val func = (c: TaskContext, i: Iterator[String]) => i.next + val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0) + intercept[RuntimeException] { + task.run(0) + } + assert(completed === true) + } + + case class StubSplit(val index: Int) extends Split +}
\ No newline at end of file |