diff options
55 files changed, 1159 insertions, 365 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml index d62332137a..09df8c1fd7 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-assembly</artifactId> + <artifactId>spark-assembly_2.9.3</artifactId> <name>Spark Project Assembly</name> <url>http://spark.incubator.apache.org/</url> @@ -41,27 +41,27 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib</artifactId> + <artifactId>spark-mllib_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl</artifactId> + <artifactId>spark-repl_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-streaming_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -104,13 +104,13 @@ </goals> <configuration> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/org.apache.hadoop.fs.FileSystem</resource> </transformer> </transformers> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> @@ -128,7 +128,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn</artifactId> + <artifactId>spark-yarn_2.9.3</artifactId> <version>${project.version}</version> </dependency> </dependencies> diff --git a/bagel/pom.xml b/bagel/pom.xml index c4ce006085..0e552c880f 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_2.9.3</artifactId> <packaging>jar</packaging> <name>Spark Project Bagel</name> <url>http://spark.incubator.apache.org/</url> @@ -34,7 +34,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -43,12 +43,12 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> + <artifactId>scalatest_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.version}</artifactId> + <artifactId>scalacheck_2.9.3</artifactId> <scope>test</scope> </dependency> </dependencies> diff --git a/core/pom.xml b/core/pom.xml index 9c2d6046a9..d694508938 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <packaging>jar</packaging> <name>Spark Project Core</name> <url>http://spark.incubator.apache.org/</url> @@ -39,7 +39,6 @@ <dependency> <groupId>net.java.dev.jets3t</groupId> <artifactId>jets3t</artifactId> - <version>0.7.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> @@ -162,12 +161,12 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> + <artifactId>scalatest_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.version}</artifactId> + <artifactId>scalacheck_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> @@ -202,14 +201,14 @@ <configuration> <exportAntProperties>true</exportAntProperties> <tasks> - <property name="spark.classpath" refid="maven.test.classpath"/> - <property environment="env"/> + <property name="spark.classpath" refid="maven.test.classpath" /> + <property environment="env" /> <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> <condition> <not> <or> - <isset property="env.SCALA_HOME"/> - <isset property="env.SCALA_LIBRARY_PATH"/> + <isset property="env.SCALA_HOME" /> + <isset property="env.SCALA_LIBRARY_PATH" /> </or> </not> </condition> diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 68b99ca125..4cf7eb96da 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -26,28 +26,29 @@ import org.apache.spark.rdd.RDD sure a node doesn't load two copies of an RDD at once. */ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { - private val loading = new HashSet[String] + + /** Keys of RDD splits that are being computed/loaded. */ + private val loading = new HashSet[String]() /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logDebug("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return cachedValues.asInstanceOf[Iterator[T]] + case Some(values) => + // Partition is already materialized, so just return its values + return values.asInstanceOf[Iterator[T]] case None => // Mark the split as loading (unless someone else marks it first) loading.synchronized { if (loading.contains(key)) { - logInfo("Loading contains " + key + ", waiting...") + logInfo("Another thread is loading %s, waiting for it to finish...".format(key)) while (loading.contains(key)) { try {loading.wait()} catch {case _ : Throwable =>} } - logInfo("Loading no longer contains " + key + ", so returning cached result") + logInfo("Finished waiting for %s".format(key)) // See whether someone else has successfully loaded it. The main way this would fail // is for the RDD-level cache eviction policy if someone else has loaded the same RDD // partition but we didn't want to make space for it. However, that case is unlikely @@ -57,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => return values.asInstanceOf[Iterator[T]] case None => - logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") + logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key)) loading.add(key) } } else { @@ -66,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Computing partition " + split) + logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 912ce752fb..febcf9c6ee 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.LocalSparkCluster import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ @@ -83,9 +84,11 @@ class SparkContext( val sparkHome: String = null, val jars: Seq[String] = Nil, val environment: Map[String, String] = Map(), - // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too. - // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()) + // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) + // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set + // of data-local splits on host + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = + scala.collection.immutable.Map()) extends Logging { // Ensure logging is initialized before we spawn any threads @@ -145,7 +148,7 @@ class SparkContext( } // Create and start the scheduler - private var taskScheduler: TaskScheduler = { + private[spark] var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks @@ -238,7 +241,8 @@ class SparkContext( val env = SparkEnv.get val conf = env.hadoop.newConfiguration() // Explicitly check for S3 environment variables - if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { + if (System.getenv("AWS_ACCESS_KEY_ID") != null && + System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) @@ -337,6 +341,8 @@ class SparkContext( valueClass: Class[V], minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { + // Add necessary security credentials to the JobConf before broadcasting it. + SparkEnv.get.hadoop.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -347,10 +353,27 @@ class SparkContext( keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits - ) : RDD[(K, V)] = { - val conf = new JobConf(hadoopConfiguration) - FileInputFormat.setInputPaths(conf, path) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + ): RDD[(K, V)] = { + // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. + val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) + hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) + } + + /** + * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration + * that has already been broadcast, assuming that it's safe to use it to construct a + * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued). + */ + def hadoopFile[K, V]( + path: String, + confBroadcast: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int + ): RDD[(K, V)] = { + new HadoopFileRDD( + this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) } /** diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 03bf268863..8466c2a004 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -46,6 +46,10 @@ private[spark] case class ExceptionFailure( metrics: Option[TaskMetrics]) extends TaskEndReason -private[spark] case class OtherFailure(message: String) extends TaskEndReason +/** + * The task finished successfully, but the result was lost from the executor's block manager before + * it was fetched. + */ +private[spark] case object TaskResultLost extends TaskEndReason -private[spark] case class TaskResultTooBigFailure() extends TaskEndReason +private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala index b090c6edf3..2be4e323be 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala @@ -17,12 +17,13 @@ package org.apache.spark.api.python -import org.apache.spark.Partitioner import java.util.Arrays + +import org.apache.spark.Partitioner import org.apache.spark.util.Utils /** - * A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API. + * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the Python API. * * Stores the unique id() of the Python-side partitioning function so that it is incorporated into * equality comparisons. Correctness requires that the id is a unique identifier for the @@ -30,6 +31,7 @@ import org.apache.spark.util.Utils * function). This can be ensured by using the Python id() function and maintaining a reference * to the Python partitioning function so that its id() is not reused. */ + private[spark] class PythonPartitioner( override val numPartitions: Int, val pyPartitionFunctionId: Long) @@ -37,7 +39,9 @@ private[spark] class PythonPartitioner( override def getPartition(key: Any): Int = key match { case null => 0 - case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions) + // we don't trust the Python partition function to return valid partition ID's so + // let's do a modulo numPartitions in any case + case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions) case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index ccd3833964..1f8ad688a6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -187,14 +187,14 @@ private class PythonException(msg: String) extends Exception(msg) * This is used by PySpark's shuffle operations. */ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends - RDD[(Array[Byte], Array[Byte])](prev) { + RDD[(Long, Array[Byte])](prev) { override def getPartitions = prev.partitions override def compute(split: Partition, context: TaskContext) = prev.iterator(split, context).grouped(2).map { - case Seq(a, b) => (a, b) + case Seq(a, b) => (Utils.deserializeLongValue(a), b) case x => throw new SparkException("PairwiseRDD: unexpected value: " + x) } - val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) + val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this) } private[spark] object PythonRDD { diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 87a703427c..04d01c169d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -41,6 +41,7 @@ private[spark] object JsonProtocol { ("starttime" -> obj.startTime) ~ ("id" -> obj.id) ~ ("name" -> obj.desc.name) ~ + ("appuiurl" -> obj.appUiUrl) ~ ("cores" -> obj.desc.maxCores) ~ ("user" -> obj.desc.user) ~ ("memoryperslave" -> obj.desc.memoryPerSlave) ~ @@ -64,7 +65,7 @@ private[spark] object JsonProtocol { } def writeMasterState(obj: MasterStateResponse) = { - ("url" -> ("spark://" + obj.uri)) ~ + ("url" -> obj.uri) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ ("cores" -> obj.workers.map(_.cores).sum) ~ ("coresused" -> obj.workers.map(_.coresUsed).sum) ~ diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 0a5f4c368f..993ba6bd3d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -16,6 +16,9 @@ */ package org.apache.spark.deploy + +import com.google.common.collect.MapMaker + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf @@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf * Contains util methods to interact with Hadoop from spark. */ class SparkHadoopUtil { + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). + private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop + // subsystems def newConfiguration(): Configuration = new Configuration() - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + // Add any user credentials to the job conf which are necessary for running on a secure Hadoop + // cluster def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index ceae3b8289..acdb8d0343 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import java.io.{File} +import java.io.File import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.concurrent._ @@ -27,11 +27,11 @@ import scala.collection.mutable.HashMap import org.apache.spark.scheduler._ import org.apache.spark._ +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils - /** - * The Mesos executor for Spark. + * Spark executor used with Mesos and the standalone scheduler. */ private[spark] class Executor( executorId: String, @@ -167,12 +167,20 @@ private[spark] class Executor( // we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could // just change the relevants bytes in the byte buffer val accumUpdates = Accumulators.values - val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) - val serializedResult = ser.serialize(result) - logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit) - if (serializedResult.limit >= (akkaFrameSize - 1024)) { - context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure())) - return + val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null)) + val serializedDirectResult = ser.serialize(directResult) + logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) + val serializedResult = { + if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + logInfo("Storing result for " + taskId + " in local BlockManager") + val blockId = "taskresult_" + taskId + env.blockManager.putBytes( + blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) + ser.serialize(new IndirectTaskResult[Any](blockId)) + } else { + logInfo("Sending result for " + taskId + " directly to driver") + serializedDirectResult + } } context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo("Finished task ID " + taskId) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2cb6734e41..d3b3fffd40 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import java.io.EOFException +import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf @@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, + TaskContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.util.NextIterator import org.apache.hadoop.conf.{Configuration, Configurable} +/** + * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file + * system, or S3). + * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same + * across multiple reads; the 'path' is the only variable that is different across new JobConfs + * created from the Configuration. + */ +class HadoopFileRDD[K, V]( + sc: SparkContext, + path: String, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) + extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) { + + override def getJobConf(): JobConf = { + if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a new JobConf, set the input file/directory paths to read from, and cache the + // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through + // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple + // getJobConf() calls for this RDD in the local process. + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + val newJobConf = new JobConf(broadcastedConf.value.value) + FileInputFormat.setInputPaths(newJobConf, path) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } +} /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp } /** - * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file - * system, or S3, tables in HBase, etc). + * A base class that provides core functionality for reading data partitions stored in Hadoop. */ class HadoopRDD[K, V]( sc: SparkContext, - @transient conf: JobConf, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int) extends RDD[(K, V)](sc, Nil) with Logging { - // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + def this( + sc: SparkContext, + conf: JobConf, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) = { + this( + sc, + sc.broadcast(new SerializableWritable(conf)) + .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + inputFormatClass, + keyClass, + valueClass, + minSplits) + } + + protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) + + protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) + + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. + protected def getJobConf(): JobConf = { + val conf: Configuration = broadcastedConf.value.value + if (conf.isInstanceOf[JobConf]) { + // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. + return conf.asInstanceOf[JobConf] + } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the + // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + val newJobConf = new JobConf(broadcastedConf.value.value) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } + + protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { + if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { + return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]] + } + // Once an InputFormat for this RDD is created, cache it so that only one reflection call is + // done in each local process. + val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[K, V]] + if (newInputFormat.isInstanceOf[Configurable]) { + newInputFormat.asInstanceOf[Configurable].setConf(conf) + } + HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) + return newInputFormat + } override def getPartitions: Array[Partition] = { - val env = SparkEnv.get - env.hadoop.addCredentials(conf) - val inputFormat = createInputFormat(conf) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(conf) + inputFormat.asInstanceOf[Configurable].setConf(jobConf) } - val inputSplits = inputFormat.getSplits(conf, minSplits) + val inputSplits = inputFormat.getSplits(jobConf, minSplits) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) @@ -75,22 +164,14 @@ class HadoopRDD[K, V]( array } - def createInputFormat(conf: JobConf): InputFormat[K, V] = { - ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) - .asInstanceOf[InputFormat[K, V]] - } - override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null - val conf = confBroadcast.value.value - val fmt = createInputFormat(conf) - if (fmt.isInstanceOf[Configurable]) { - fmt.asInstanceOf[Configurable].setConf(conf) - } - reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } @@ -127,5 +208,18 @@ class HadoopRDD[K, V]( // Do nothing. Hadoop RDD should not be checkpointed. } - def getConf: Configuration = confBroadcast.value.value + def getConf: Configuration = getJobConf() +} + +private[spark] object HadoopRDD { + /** + * The three methods below are helpers for accessing the local map, a property of the SparkEnv of + * the local process. + */ + def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) + + def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) + + def putCachedMetadata(key: String, value: Any) = + SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8a55df4af0..4053b91134 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -552,7 +552,7 @@ class DAGScheduler( SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head) } catch { case e: NotSerializableException => - abortStage(stage, e.toString) + abortStage(stage, "Task not serializable: " + e.toString) running -= stage return } @@ -704,6 +704,9 @@ class DAGScheduler( case ExceptionFailure(className, description, stackTrace, metrics) => // Do nothing here, left up to the TaskScheduler to decide how to handle user failures + case TaskResultLost => + // Do nothing here; the TaskScheduler handles these failures and resubmits the task. + case other => // Unrecognized failure - abort all jobs depending on this stage abortStage(stageIdToStage(task.stageId), task + " failed: " + other) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index c9a66b3a75..9eb8d48501 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -45,7 +45,7 @@ private[spark] class Pool( var priority = 0 var stageId = 0 var name = poolName - var parent:Schedulable = null + var parent: Pool = null var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { @@ -101,14 +101,14 @@ private[spark] class Pool( return sortedTaskSetQueue } - override def increaseRunningTasks(taskNum: Int) { + def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - override def decreaseRunningTasks(taskNum: Int) { + def decreaseRunningTasks(taskNum: Int) { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index 857adaef5a..1c7ea2dccc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer * there are two type of Schedulable entities(Pools and TaskSetManagers) */ private[spark] trait Schedulable { - var parent: Schedulable + var parent: Pool // child queues def schedulableQueue: ArrayBuffer[Schedulable] def schedulingMode: SchedulingMode @@ -36,8 +36,6 @@ private[spark] trait Schedulable { def stageId: Int def name: String - def increaseRunningTasks(taskNum: Int): Unit - def decreaseRunningTasks(taskNum: Int): Unit def addSchedulable(schedulable: Schedulable): Unit def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 5c7e5bb977..db3954a9d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -26,12 +26,17 @@ import java.nio.ByteBuffer import org.apache.spark.util.Utils // Task result. Also contains updates to accumulator variables. -// TODO: Use of distributed cache to return result is a hack to get around -// what seems to be a bug with messages over 60KB in libprocess; fix it +private[spark] sealed trait TaskResult[T] + +/** A reference to a DirectTaskResult that has been stored in the worker's BlockManager. */ +private[spark] +case class IndirectTaskResult[T](val blockId: String) extends TaskResult[T] with Serializable + +/** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] -class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) - extends Externalizable -{ +class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) + extends TaskResult[T] with Externalizable { + def this() = this(null.asInstanceOf[T], null, null) override def writeExternal(out: ObjectOutput) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f192b0b7a4..90f6bcefac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -44,7 +44,5 @@ private[spark] trait TaskSetManager extends Schedulable { maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] - def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) - def error(message: String) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index a6dee604b7..1a844b7e7e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -18,6 +18,9 @@ package org.apache.spark.scheduler.cluster import java.lang.{Boolean => JBoolean} +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong +import java.util.{TimerTask, Timer} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -27,9 +30,6 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLong -import java.util.{TimerTask, Timer} /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -55,7 +55,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // Threshold above which we warn user initial TaskSet may be starved val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong - val activeTaskSets = new HashMap[String, TaskSetManager] + // ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized + // on this class. + val activeTaskSets = new HashMap[String, ClusterTaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskIdToExecutorId = new HashMap[Long, String] @@ -65,7 +67,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) @volatile private var hasLaunchedTask = false private val starvationTimer = new Timer(true) - // Incrementing Mesos task IDs + // Incrementing task IDs val nextTaskId = new AtomicLong(0) // Which executor IDs we have executors on @@ -96,6 +98,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val schedulingMode: SchedulingMode = SchedulingMode.withName( System.getProperty("spark.scheduler.mode", "FIFO")) + // This is a var so that we can reset it for testing purposes. + private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) + override def setListener(listener: TaskSchedulerListener) { this.listener = listener } @@ -234,7 +239,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { - var taskSetToUpdate: Option[TaskSetManager] = None var failedExecutor: Option[String] = None var taskFailed = false synchronized { @@ -249,9 +253,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } taskIdToTaskSetId.get(tid) match { case Some(taskSetId) => - if (activeTaskSets.contains(taskSetId)) { - taskSetToUpdate = Some(activeTaskSets(taskSetId)) - } if (TaskState.isFinished(state)) { taskIdToTaskSetId.remove(tid) if (taskSetTaskIds.contains(taskSetId)) { @@ -262,6 +263,15 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (state == TaskState.FAILED) { taskFailed = true } + activeTaskSets.get(taskSetId).foreach { taskSet => + if (state == TaskState.FINISHED) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } + } case None => logInfo("Ignoring update from TID " + tid + " because its task set is gone") } @@ -269,10 +279,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) case e: Exception => logError("Exception in statusUpdate", e) } } - // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock - if (taskSetToUpdate != None) { - taskSetToUpdate.get.statusUpdate(tid, state, serializedData) - } + // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor != None) { listener.executorLost(failedExecutor.get) backend.reviveOffers() @@ -283,6 +290,25 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + def handleSuccessfulTask( + taskSetManager: ClusterTaskSetManager, + tid: Long, + taskResult: DirectTaskResult[_]) = synchronized { + taskSetManager.handleSuccessfulTask(tid, taskResult) + } + + def handleFailedTask( + taskSetManager: ClusterTaskSetManager, + tid: Long, + taskState: TaskState, + reason: Option[TaskEndReason]) = synchronized { + taskSetManager.handleFailedTask(tid, taskState, reason) + if (taskState == TaskState.FINISHED) { + // The task finished successfully but the result was lost, so we should revive offers. + backend.reviveOffers() + } + } + def error(message: String) { synchronized { if (activeTaskSets.size > 0) { @@ -311,6 +337,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (jarServer != null) { jarServer.stop() } + if (taskResultGetter != null) { + taskResultGetter.stop() + } // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out. // TODO: Do something better ! diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 411e49b021..194ab55102 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -28,7 +28,7 @@ import scala.math.min import scala.Some import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv, - SparkException, Success, TaskEndReason, TaskResultTooBigFailure, TaskState} + SparkException, Success, TaskEndReason, TaskResultLost, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ import org.apache.spark.util.{SystemClock, Clock} @@ -68,18 +68,20 @@ private[spark] class ClusterTaskSetManager( val tasks = taskSet.tasks val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) - val finished = new Array[Boolean](numTasks) + val successful = new Array[Boolean](numTasks) val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) - var tasksFinished = 0 + var tasksSuccessful = 0 var weight = 1 var minShare = 0 - var runningTasks = 0 var priority = taskSet.priority var stageId = taskSet.stageId var name = "TaskSet_"+taskSet.stageId.toString - var parent: Schedulable = null + var parent: Pool = null + + var runningTasks = 0 + private val runningTasksSet = new HashSet[Long] // Set of pending tasks for each executor. These collections are actually // treated as stacks, in which new tasks are added to the end of the @@ -220,7 +222,7 @@ private[spark] class ClusterTaskSetManager( while (!list.isEmpty) { val index = list.last list.trimEnd(1) - if (copiesRunning(index) == 0 && !finished(index)) { + if (copiesRunning(index) == 0 && !successful(index)) { return Some(index) } } @@ -240,7 +242,7 @@ private[spark] class ClusterTaskSetManager( private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { - speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set + speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set if (!speculatableTasks.isEmpty) { // Check for process-local or preference-less tasks; note that tasks can be process-local @@ -341,7 +343,7 @@ private[spark] class ClusterTaskSetManager( maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { - if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { + if (tasksSuccessful < numTasks && availableCpus >= CPUS_PER_TASK) { val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) @@ -372,7 +374,7 @@ private[spark] class ClusterTaskSetManager( val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = clock.getTime() - startTime - increaseRunningTasks(1) + addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) @@ -414,94 +416,61 @@ private[spark] class ClusterTaskSetManager( index } - /** Called by cluster scheduler when one of our tasks changes state */ - override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { - SparkEnv.set(env) - state match { - case TaskState.FINISHED => - taskFinished(tid, state, serializedData) - case TaskState.LOST => - taskLost(tid, state, serializedData) - case TaskState.FAILED => - taskLost(tid, state, serializedData) - case TaskState.KILLED => - taskLost(tid, state, serializedData) - case _ => - } - } - - def taskStarted(task: Task[_], info: TaskInfo) { + private def taskStarted(task: Task[_], info: TaskInfo) { sched.listener.taskStarted(task, info) } - def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) { + /** + * Marks the task as successful and notifies the listener that a task has ended. + */ + def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = { val info = taskInfos(tid) - if (info.failed) { - // We might get two task-lost messages for the same task in coarse-grained Mesos mode, - // or even from Mesos itself when acks get delayed. - return - } val index = info.index info.markSuccessful() - decreaseRunningTasks(1) - if (!finished(index)) { - tasksFinished += 1 + removeRunningTask(tid) + if (!successful(index)) { logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format( - tid, info.duration, info.host, tasksFinished, numTasks)) - // Deserialize task result and pass it to the scheduler - try { - val result = ser.deserialize[TaskResult[_]](serializedData) - result.metrics.resultSize = serializedData.limit() - sched.listener.taskEnded( - tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) - } catch { - case cnf: ClassNotFoundException => - val loader = Thread.currentThread().getContextClassLoader - throw new SparkException("ClassNotFound with classloader: " + loader, cnf) - case ex => throw ex - } - // Mark finished and stop if we've finished all the tasks - finished(index) = true - if (tasksFinished == numTasks) { + tid, info.duration, info.host, tasksSuccessful, numTasks)) + sched.listener.taskEnded( + tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) + + // Mark successful and stop if all the tasks have succeeded. + tasksSuccessful += 1 + successful(index) = true + if (tasksSuccessful == numTasks) { sched.taskSetFinished(this) } } else { - logInfo("Ignoring task-finished event for TID " + tid + - " because task " + index + " is already finished") + logInfo("Ignorning task-finished event for TID " + tid + " because task " + + index + " has already completed successfully") } } - def taskLost(tid: Long, state: TaskState, serializedData: ByteBuffer) { + /** + * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener. + */ + def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) { val info = taskInfos(tid) if (info.failed) { - // We might get two task-lost messages for the same task in coarse-grained Mesos mode, - // or even from Mesos itself when acks get delayed. return } + removeRunningTask(tid) val index = info.index info.markFailed() - decreaseRunningTasks(1) - if (!finished(index)) { + if (!successful(index)) { logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) copiesRunning(index) -= 1 // Check if the problem is a map output fetch failure. In that case, this // task will never succeed on any node, so tell the scheduler about it. - if (serializedData != null && serializedData.limit() > 0) { - val reason = ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader) - reason match { + reason.foreach { + _ match { case fetchFailed: FetchFailed => logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress) sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null) - finished(index) = true - tasksFinished += 1 + successful(index) = true + tasksSuccessful += 1 sched.taskSetFinished(this) - decreaseRunningTasks(runningTasks) - return - - case taskResultTooBig: TaskResultTooBigFailure => - logInfo("Loss was due to task %s result exceeding Akka frame size; aborting job".format( - tid)) - abort("Task %s result exceeded Akka frame size".format(tid)) + removeAllRunningTasks() return case ef: ExceptionFailure => @@ -531,13 +500,16 @@ private[spark] class ClusterTaskSetManager( logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) } + case TaskResultLost => + logInfo("Lost result for TID %s on host %s".format(tid, info.host)) + sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null) + case _ => {} } } // On non-fetch failures, re-enqueue the task as pending for a max number of retries addPendingTask(index) - // Count failed attempts only on FAILED and LOST state (not on KILLED) - if (state == TaskState.FAILED || state == TaskState.LOST) { + if (state != TaskState.KILLED) { numFailures(index) += 1 if (numFailures(index) > MAX_TASK_FAILURES) { logError("Task %s:%d failed more than %d times; aborting job".format( @@ -561,22 +533,36 @@ private[spark] class ClusterTaskSetManager( causeOfFailure = message // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.listener.taskSetFailed(taskSet, message) - decreaseRunningTasks(runningTasks) + removeAllRunningTasks() sched.taskSetFinished(this) } - override def increaseRunningTasks(taskNum: Int) { - runningTasks += taskNum - if (parent != null) { - parent.increaseRunningTasks(taskNum) + /** If the given task ID is not in the set of running tasks, adds it. + * + * Used to keep track of the number of running tasks, for enforcing scheduling policies. + */ + def addRunningTask(tid: Long) { + if (runningTasksSet.add(tid) && parent != null) { + parent.increaseRunningTasks(1) + } + runningTasks = runningTasksSet.size + } + + /** If the given task ID is in the set of running tasks, removes it. */ + def removeRunningTask(tid: Long) { + if (runningTasksSet.remove(tid) && parent != null) { + parent.decreaseRunningTasks(1) } + runningTasks = runningTasksSet.size } - override def decreaseRunningTasks(taskNum: Int) { - runningTasks -= taskNum + private def removeAllRunningTasks() { + val numRunningTasks = runningTasksSet.size + runningTasksSet.clear() if (parent != null) { - parent.decreaseRunningTasks(taskNum) + parent.decreaseRunningTasks(numRunningTasks) } + runningTasks = 0 } override def getSchedulableByName(name: String): Schedulable = { @@ -612,10 +598,10 @@ private[spark] class ClusterTaskSetManager( if (tasks(0).isInstanceOf[ShuffleMapTask]) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index - if (finished(index)) { - finished(index) = false + if (successful(index)) { + successful(index) = false copiesRunning(index) -= 1 - tasksFinished -= 1 + tasksSuccessful -= 1 addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. @@ -625,7 +611,7 @@ private[spark] class ClusterTaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - taskLost(tid, TaskState.KILLED, null) + handleFailedTask(tid, TaskState.KILLED, None) } } @@ -638,13 +624,13 @@ private[spark] class ClusterTaskSetManager( */ override def checkSpeculatableTasks(): Boolean = { // Can't speculate if we only have one task, or if all tasks have finished. - if (numTasks == 1 || tasksFinished == numTasks) { + if (numTasks == 1 || tasksSuccessful == numTasks) { return false } var foundTasks = false val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) - if (tasksFinished >= minFinishedForSpeculation) { + if (tasksSuccessful >= minFinishedForSpeculation) { val time = clock.getTime() val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray Arrays.sort(durations) @@ -655,7 +641,7 @@ private[spark] class ClusterTaskSetManager( logDebug("Task length threshold for speculation: " + threshold) for ((tid, info) <- taskInfos) { val index = info.index - if (!finished(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && + if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && !speculatableTasks.contains(index)) { logInfo( "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format( @@ -669,7 +655,7 @@ private[spark] class ClusterTaskSetManager( } override def hasPendingTasks(): Boolean = { - numTasks > 0 && tasksFinished < numTasks + numTasks > 0 && tasksSuccessful < numTasks } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala new file mode 100644 index 0000000000..feec8ecfe4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingDeque, ThreadFactory, ThreadPoolExecutor, TimeUnit} + +import org.apache.spark._ +import org.apache.spark.TaskState.TaskState +import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} +import org.apache.spark.serializer.SerializerInstance + +/** + * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. + */ +private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) + extends Logging { + private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt + private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt + private val getTaskResultExecutor = new ThreadPoolExecutor( + MIN_THREADS, + MAX_THREADS, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingDeque[Runnable], + new ResultResolverThreadFactory) + + class ResultResolverThreadFactory extends ThreadFactory { + private var counter = 0 + private var PREFIX = "Result resolver thread" + + override def newThread(r: Runnable): Thread = { + val thread = new Thread(r, "%s-%s".format(PREFIX, counter)) + counter += 1 + thread.setDaemon(true) + return thread + } + } + + protected val serializer = new ThreadLocal[SerializerInstance] { + override def initialValue(): SerializerInstance = { + return sparkEnv.closureSerializer.newInstance() + } + } + + def enqueueSuccessfulTask( + taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) { + getTaskResultExecutor.execute(new Runnable { + override def run() { + try { + val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { + case directResult: DirectTaskResult[_] => directResult + case IndirectTaskResult(blockId) => + logDebug("Fetching indirect task result for TID %s".format(tid)) + val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) + if (!serializedTaskResult.isDefined) { + /* We won't be able to get the task result if the machine that ran the task failed + * between when the task ended and when we tried to fetch the result, or if the + * block manager had to flush the result. */ + scheduler.handleFailedTask( + taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost)) + return + } + val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( + serializedTaskResult.get) + sparkEnv.blockManager.master.removeBlock(blockId) + deserializedResult + } + result.metrics.resultSize = serializedData.limit() + scheduler.handleSuccessfulTask(taskSetManager, tid, result) + } catch { + case cnf: ClassNotFoundException => + val loader = Thread.currentThread.getContextClassLoader + taskSetManager.abort("ClassNotFound with classloader: " + loader) + case ex => + taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) + } + } + }) + } + + def enqueueFailedTask(taskSetManager: ClusterTaskSetManager, tid: Long, taskState: TaskState, + serializedData: ByteBuffer) { + var reason: Option[TaskEndReason] = None + getTaskResultExecutor.execute(new Runnable { + override def run() { + try { + if (serializedData != null && serializedData.limit() > 0) { + reason = Some(serializer.get().deserialize[TaskEndReason]( + serializedData, getClass.getClassLoader)) + } + } catch { + case cnd: ClassNotFoundException => + // Log an error but keep going here -- the task failed, so not catastropic if we can't + // deserialize the reason. + val loader = Thread.currentThread.getContextClassLoader + logError( + "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) + case ex => {} + } + scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) + } + }) + } + + def stop() { + getTaskResultExecutor.shutdownNow() + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index e29438f4ed..4d1bb1c639 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -91,7 +91,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var rootPool: Pool = null val schedulingMode: SchedulingMode = SchedulingMode.withName( System.getProperty("spark.scheduler.mode", "FIFO")) - val activeTaskSets = new HashMap[String, TaskSetManager] + val activeTaskSets = new HashMap[String, LocalTaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] @@ -210,7 +210,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: deserializedTask.metrics.get.executorRunTime = serviceTime.toInt deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt - val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null)) + val taskResult = new DirectTaskResult( + result, accumUpdates, deserializedTask.metrics.getOrElse(null)) val serializedResult = ser.serialize(taskResult) localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala index a2fda4c124..c2e2399ccb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala @@ -21,16 +21,16 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState} +import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState} import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.{Schedulable, Task, TaskDescription, TaskInfo, TaskLocality, - TaskResult, TaskSet, TaskSetManager} +import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task, + TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager} private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging { - var parent: Schedulable = null + var parent: Pool = null var weight: Int = 1 var minShare: Int = 0 var runningTasks: Int = 0 @@ -49,14 +49,14 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val numFailures = new Array[Int](numTasks) val MAX_TASK_FAILURES = sched.maxFailures - override def increaseRunningTasks(taskNum: Int): Unit = { + def increaseRunningTasks(taskNum: Int): Unit = { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - override def decreaseRunningTasks(taskNum: Int): Unit = { + def decreaseRunningTasks(taskNum: Int): Unit = { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) @@ -132,7 +132,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas return None } - override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { + def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { SparkEnv.set(env) state match { case TaskState.FINISHED => @@ -152,7 +152,12 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val index = info.index val task = taskSet.tasks(index) info.markSuccessful() - val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) + val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match { + case directResult: DirectTaskResult[_] => directResult + case IndirectTaskResult(blockId) => { + throw new SparkException("Expect only DirectTaskResults when using LocalScheduler") + } + } result.metrics.resultSize = serializedData.limit() sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics) numFinished += 1 diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 24ef204aa1..6c500bad92 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -38,8 +38,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging def newKryoOutput() = new KryoOutput(bufferSize) - def newKryoInput() = new KryoInput(bufferSize) - def newKryo(): Kryo = { val instantiator = new ScalaKryoInstantiator val kryo = instantiator.newKryo() @@ -118,8 +116,10 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { val kryo = ks.newKryo() - val output = ks.newKryoOutput() - val input = ks.newKryoInput() + + // Make these lazy vals to avoid creating a buffer unless we use them + lazy val output = ks.newKryoOutput() + lazy val input = new KryoInput() def serialize[T](t: T): ByteBuffer = { output.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 60fdc5f2ee..37d0ddb17b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -484,7 +484,7 @@ private[spark] class BlockManager( for (loc <- locations) { logDebug("Getting remote block " + blockId + " from " + loc) val data = BlockManagerWorker.syncGetBlock( - GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) + GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { return Some(dataDeserialize(blockId, data)) } @@ -495,10 +495,45 @@ private[spark] class BlockManager( } /** + * Get block from remote block managers as serialized bytes. + */ + def getRemoteBytes(blockId: String): Option[ByteBuffer] = { + // TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be + // refactored. + if (blockId == null) { + throw new IllegalArgumentException("Block Id is null") + } + logDebug("Getting remote block " + blockId + " as bytes") + + val locations = master.getLocations(blockId) + for (loc <- locations) { + logDebug("Getting remote block " + blockId + " from " + loc) + val data = BlockManagerWorker.syncGetBlock( + GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) + if (data != null) { + return Some(data) + } + logDebug("The value of block " + blockId + " is null") + } + logDebug("Block " + blockId + " not found") + return None + } + + /** * Get a block from the block manager (either local or remote). */ def get(blockId: String): Option[Iterator[Any]] = { - getLocal(blockId).orElse(getRemote(blockId)) + val local = getLocal(blockId) + if (local.isDefined) { + logInfo("Found block %s locally".format(blockId)) + return local + } + val remote = getRemote(blockId) + if (remote.isDefined) { + logInfo("Found block %s remotely".format(blockId)) + return remote + } + None } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 886f071503..f384875cc9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -70,6 +70,19 @@ private[spark] object Utils extends Logging { return ois.readObject.asInstanceOf[T] } + /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */ + def deserializeLongValue(bytes: Array[Byte]) : Long = { + // Note: we assume that we are given a Long value encoded in network (big-endian) byte order + var result = bytes(7) & 0xFFL + result = result + ((bytes(6) & 0xFFL) << 8) + result = result + ((bytes(5) & 0xFFL) << 16) + result = result + ((bytes(4) & 0xFFL) << 24) + result = result + ((bytes(3) & 0xFFL) << 32) + result = result + ((bytes(2) & 0xFFL) << 40) + result = result + ((bytes(1) & 0xFFL) << 48) + result + ((bytes(0) & 0xFFL) << 56) + } + /** Serialize via nested stream using specific serializer */ def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = { val osWrapper = ser.serializeStream(new OutputStream { diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 7a856d4081..cd2bf9a8ff 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -319,19 +319,6 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } } } - - test("job should fail if TaskResult exceeds Akka frame size") { - // We must use local-cluster mode since results are returned differently - // when running under LocalScheduler: - sc = new SparkContext("local-cluster[1,1,512]", "test") - val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt - val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)} - val exception = intercept[SparkException] { - rdd.reduce((x, y) => x) - } - exception.getMessage should endWith("result exceeded Akka frame size") - } } object DistributedSuite { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9ed591e494..2f933246b0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -32,8 +32,6 @@ import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency} import org.apache.spark.{FetchFailed, Success, TaskEndReason} import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster} -import org.apache.spark.scheduler.Pool -import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index aac7c207cb..41a161e08a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -29,7 +29,9 @@ import org.apache.spark.SparkContext._ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { - test("local metrics") { + // TODO: This test has a race condition since the DAGScheduler now reports results + // asynchronously. It needs to be updated for that patch. + ignore("local metrics") { sc = new SparkContext("local[4]", "test") val listener = new SaveStageInfo sc.addSparkListener(listener) @@ -43,6 +45,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} d.count + Thread.sleep(1000) listener.stageInfos.size should be (1) val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1") @@ -54,6 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc d4.collectAsMap + Thread.sleep(1000) listener.stageInfos.size should be (4) listener.stageInfos.foreach {stageInfo => //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala index 1b50ce06b3..95d3553d91 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala @@ -43,16 +43,16 @@ class FakeTaskSetManager( stageId = initStageId name = "TaskSet_"+stageId override val numTasks = initNumTasks - tasksFinished = 0 + tasksSuccessful = 0 - override def increaseRunningTasks(taskNum: Int) { + def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - override def decreaseRunningTasks(taskNum: Int) { + def decreaseRunningTasks(taskNum: Int) { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) @@ -79,7 +79,7 @@ class FakeTaskSetManager( maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { - if (tasksFinished + runningTasks < numTasks) { + if (tasksSuccessful + runningTasks < numTasks) { increaseRunningTasks(1) return Some(new TaskDescription(0, execId, "task 0:0", 0, null)) } @@ -92,8 +92,8 @@ class FakeTaskSetManager( def taskFinished() { decreaseRunningTasks(1) - tasksFinished +=1 - if (tasksFinished == numTasks) { + tasksSuccessful +=1 + if (tasksSuccessful == numTasks) { parent.removeSchedulable(this) } } @@ -114,7 +114,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val taskSetQueue = rootPool.getSortedTaskSetQueue() /* Just for Test*/ for (manager <- taskSetQueue) { - logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks)) + logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format( + manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks)) } for (taskSet <- taskSetQueue) { taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match { diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index ff70a2cdf0..80d0c5a5e9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -40,6 +40,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* val startedTasks = new ArrayBuffer[Long] val endedTasks = new mutable.HashMap[Long, TaskEndReason] val finishedManagers = new ArrayBuffer[TaskSetManager] + val taskSetsFailed = new ArrayBuffer[String] val executors = new mutable.HashMap[String, String] ++ liveExecutors @@ -63,7 +64,9 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* def executorLost(execId: String) {} - def taskSetFailed(taskSet: TaskSet, reason: String) {} + def taskSetFailed(taskSet: TaskSet, reason: String) { + taskSetsFailed += taskSet.id + } } def removeExecutor(execId: String): Unit = executors -= execId @@ -101,7 +104,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None) // Tell it the task has finished - manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0)) + manager.handleSuccessfulTask(0, createTaskResult(0)) assert(sched.endedTasks(0) === Success) assert(sched.finishedManagers.contains(manager)) } @@ -125,14 +128,14 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None) // Finish the first two tasks - manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0)) - manager.statusUpdate(1, TaskState.FINISHED, createTaskResult(1)) + manager.handleSuccessfulTask(0, createTaskResult(0)) + manager.handleSuccessfulTask(1, createTaskResult(1)) assert(sched.endedTasks(0) === Success) assert(sched.endedTasks(1) === Success) assert(!sched.finishedManagers.contains(manager)) // Finish the last task - manager.statusUpdate(2, TaskState.FINISHED, createTaskResult(2)) + manager.handleSuccessfulTask(2, createTaskResult(2)) assert(sched.endedTasks(2) === Success) assert(sched.finishedManagers.contains(manager)) } @@ -253,6 +256,47 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None) } + test("task result lost") { + sc = new SparkContext("local", "test") + val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val taskSet = createTaskSet(1) + val clock = new FakeClock + val manager = new ClusterTaskSetManager(sched, taskSet, clock) + + assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + + // Tell it the task has finished but the result was lost. + manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost)) + assert(sched.endedTasks(0) === TaskResultLost) + + // Re-offer the host -- now we should get task 0 again. + assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + } + + test("repeated failures lead to task set abortion") { + sc = new SparkContext("local", "test") + val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val taskSet = createTaskSet(1) + val clock = new FakeClock + val manager = new ClusterTaskSetManager(sched, taskSet, clock) + + // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted + // after the last failure. + (0 until manager.MAX_TASK_FAILURES).foreach { index => + val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY) + assert(offerResult != None, + "Expect resource offer on iteration %s to return a task".format(index)) + assert(offerResult.get.index === 0) + manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost)) + if (index < manager.MAX_TASK_FAILURES) { + assert(!sched.taskSetsFailed.contains(taskSet.id)) + } else { + assert(sched.taskSetsFailed.contains(taskSet.id)) + } + } + } + + /** * Utility method to create a TaskSet, potentially setting a particular sequence of preferred * locations for each task (given as varargs) if this sequence is not empty. @@ -267,7 +311,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo new TaskSet(tasks, 0, 0, 0, null) } - def createTaskResult(id: Int): ByteBuffer = { - ByteBuffer.wrap(Utils.serialize(new TaskResult[Int](id, mutable.Map.empty, new TaskMetrics))) + def createTaskResult(id: Int): DirectTaskResult[Int] = { + new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala new file mode 100644 index 0000000000..119ba30090 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.nio.ByteBuffer + +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv} +import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} + +/** + * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter. + * + * Used to test the case where a BlockManager evicts the task result (or dies) before the + * TaskResult is retrieved. + */ +class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) + extends TaskResultGetter(sparkEnv, scheduler) { + var removedResult = false + + override def enqueueSuccessfulTask( + taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) { + if (!removedResult) { + // Only remove the result once, since we'd like to test the case where the task eventually + // succeeds. + serializer.get().deserialize[TaskResult[_]](serializedData) match { + case IndirectTaskResult(blockId) => + sparkEnv.blockManager.master.removeBlock(blockId) + case directResult: DirectTaskResult[_] => + taskSetManager.abort("Internal error: expect only indirect results") + } + serializedData.rewind() + removedResult = true + } + super.enqueueSuccessfulTask(taskSetManager, tid, serializedData) + } +} + +/** + * Tests related to handling task results (both direct and indirect). + */ +class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll + with LocalSparkContext { + + override def beforeAll { + // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small + // as we can make it) so the tests don't take too long. + System.setProperty("spark.akka.frameSize", "1") + } + + before { + // Use local-cluster mode because results are returned differently when running with the + // LocalScheduler. + sc = new SparkContext("local-cluster[1,1,512]", "test") + } + + override def afterAll { + System.clearProperty("spark.akka.frameSize") + } + + test("handling results smaller than Akka frame size") { + val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x) + assert(result === 2) + } + + test("handling results larger than Akka frame size") { + val akkaFrameSize = + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) + assert(result === 1.to(akkaFrameSize).toArray) + + val RESULT_BLOCK_ID = "taskresult_0" + assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0, + "Expect result to be removed from the block manager.") + } + + test("task retried if result missing from block manager") { + // If this test hangs, it's probably because no resource offers were made after the task + // failed. + val scheduler: ClusterScheduler = sc.taskScheduler match { + case clusterScheduler: ClusterScheduler => + clusterScheduler + case _ => + assert(false, "Expect local cluster to use ClusterScheduler") + throw new ClassCastException + } + scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) + val akkaFrameSize = + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) + assert(result === 1.to(akkaFrameSize).toArray) + + // Make sure two tasks were run (one failed one, and a second retried one). + assert(scheduler.nextTaskId.get() === 2) + } +} + diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 07c9f2382b..8f0ec6683b 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -26,7 +26,12 @@ class UISuite extends FunSuite { test("jetty port increases under contention") { val startPort = 4040 val server = new Server(startPort) - server.start() + + Try { server.start() } match { + case Success(s) => + case Failure(e) => + // Either case server port is busy hence setup for test complete + } val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq()) val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq()) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index e2859caf58..4684c8c972 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import com.google.common.base.Charsets import com.google.common.io.Files import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File} +import java.nio.{ByteBuffer, ByteOrder} import org.scalatest.FunSuite import org.apache.commons.io.FileUtils import scala.util.Random @@ -135,5 +136,15 @@ class UtilsSuite extends FunSuite { FileUtils.deleteDirectory(tmpDir2) } + + test("deserialize long value") { + val testval : Long = 9730889947L + val bbuf = ByteBuffer.allocate(8) + assert(bbuf.hasArray) + bbuf.order(ByteOrder.BIG_ENDIAN) + bbuf.putLong(testval) + assert(bbuf.array.length === 8) + assert(Utils.deserializeLongValue(bbuf.array) === testval) + } } diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 238ad26de0..0c1d657cde 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -6,7 +6,7 @@ <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> - <title>{{ page.title }} - Spark {{site.SPARK_VERSION}} Documentation</title> + <title>{{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation</title> <meta name="description" content=""> <link rel="stylesheet" href="css/bootstrap.min.css"> @@ -109,7 +109,7 @@ </ul> </li> </ul> - <!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION}}</span></p>--> + <!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION_SHORT}}</span></p>--> </div> </div> </div> diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index f991d86c8d..c1ff9c417c 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -144,10 +144,9 @@ Available algorithms for clustering: # Collaborative Filtering -[Collaborative -filtering](http://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) +[Collaborative filtering](http://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) is commonly used for recommender systems. These techniques aim to fill in the -missing entries of a user-product association matrix. MLlib currently supports +missing entries of a user-item association matrix. MLlib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. In particular, we implement the [alternating least squares @@ -158,7 +157,24 @@ following parameters: * *numBlocks* is the number of blacks used to parallelize computation (set to -1 to auto-configure). * *rank* is the number of latent factors in our model. * *iterations* is the number of iterations to run. -* *lambda* specifies the regularization parameter in ALS. +* *lambda* specifies the regularization parameter in ALS. +* *implicitPrefs* specifies whether to use the *explicit feedback* ALS variant or one adapted for *implicit feedback* data +* *alpha* is a parameter applicable to the implicit feedback variant of ALS that governs the *baseline* confidence in preference observations + +## Explicit vs Implicit Feedback + +The standard approach to matrix factorization based collaborative filtering treats +the entries in the user-item matrix as *explicit* preferences given by the user to the item. + +It is common in many real-world use cases to only have access to *implicit feedback* +(e.g. views, clicks, purchases, likes, shares etc.). The approach used in MLlib to deal with +such data is taken from +[Collaborative Filtering for Implicit Feedback Datasets](http://research.yahoo.com/pub/2433). +Essentially instead of trying to model the matrix of ratings directly, this approach treats the data as +a combination of binary preferences and *confidence values*. The ratings are then related +to the level of confidence in observed user preferences, rather than explicit ratings given to items. +The model then tries to find latent factors that can be used to predict the expected preference of a user +for an item. Available algorithms for collaborative filtering: diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index c611db0af4..30128ec45d 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -50,6 +50,7 @@ The command to launch the YARN Client is as follows: --master-memory <MEMORY_FOR_MASTER> \ --worker-memory <MEMORY_PER_WORKER> \ --worker-cores <CORES_PER_WORKER> \ + --name <application_name> \ --queue <queue_name> For example: diff --git a/ec2/README b/ec2/README index 0add81312c..433da37b4c 100644 --- a/ec2/README +++ b/ec2/README @@ -1,4 +1,4 @@ This folder contains a script, spark-ec2, for launching Spark clusters on Amazon EC2. Usage instructions are available online at: -http://spark-project.org/docs/latest/ec2-scripts.html +http://spark.incubator.apache.org/docs/latest/ec2-scripts.html diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 1190ed47f6..65868b76b9 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -70,7 +70,7 @@ def parse_args(): "slaves across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies)") parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use") - parser.add_option("-v", "--spark-version", default="0.7.3", + parser.add_option("-v", "--spark-version", default="0.8.0", help="Version of Spark to use: 'X.Y.Z' or a specific git hash") parser.add_option("--spark-git-repo", default="https://github.com/mesos/spark", @@ -155,7 +155,7 @@ def is_active(instance): # Return correct versions of Spark and Shark, given the supplied Spark version def get_spark_shark_version(opts): - spark_shark_map = {"0.7.3": "0.7.0"} + spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0"} version = opts.spark_version.replace("v", "") if version not in spark_shark_map: print >> stderr, "Don't know about Spark version: %s" % version diff --git a/examples/pom.xml b/examples/pom.xml index b9cc6f5e0a..b8c020a321 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -26,33 +26,41 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-examples</artifactId> + <artifactId>spark-examples_2.9.3</artifactId> <packaging>jar</packaging> <name>Spark Project Examples</name> <url>http://spark.incubator.apache.org/</url> + <repositories> + <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 --> + <repository> + <id>lib</id> + <url>file://${project.basedir}/lib</url> + </repository> + </repositories> + <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-streaming_2.9.3</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib</artifactId> + <artifactId>spark-mllib_2.9.3</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_2.9.3</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> @@ -72,6 +80,12 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka</artifactId> + <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> @@ -82,12 +96,12 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> + <artifactId>scalatest_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.version}</artifactId> + <artifactId>scalacheck_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> @@ -161,7 +175,7 @@ </goals> <configuration> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> diff --git a/make-distribution.sh b/make-distribution.sh index bffb19843c..32bbdb90a5 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -95,7 +95,7 @@ cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/jars/" # Copy other things mkdir "$DISTDIR"/conf -cp "$FWDIR/conf/*.template" "$DISTDIR"/conf +cp "$FWDIR"/conf/*.template "$DISTDIR"/conf cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/python" "$DISTDIR" cp "$FWDIR/spark-class" "$DISTDIR" diff --git a/mllib/pom.xml b/mllib/pom.xml index 4ef4f0ae4e..f472082ad1 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib</artifactId> + <artifactId>spark-mllib_2.9.3</artifactId> <packaging>jar</packaging> <name>Spark Project ML Library</name> <url>http://spark.incubator.apache.org/</url> @@ -34,7 +34,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -48,12 +48,12 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> + <artifactId>scalatest_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.version}</artifactId> + <artifactId>scalacheck_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index be002d02bc..36853acab5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -21,7 +21,8 @@ import scala.collection.mutable.{ArrayBuffer, BitSet} import scala.util.Random import scala.util.Sorting -import org.apache.spark.{HashPartitioner, Partitioner, SparkContext} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoRegistrator @@ -61,6 +62,12 @@ case class Rating(val user: Int, val product: Int, val rating: Double) /** * Alternating Least Squares matrix factorization. * + * ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices, + * `X` and `Y`, i.e. `Xt * Y = R`. Typically these approximations are called 'factor' matrices. + * The general approach is iterative. During each iteration, one of the factor matrices is held + * constant, while the other is solved for using least squares. The newly-solved factor matrix is + * then held constant while solving for the other factor matrix. + * * This is a blocked implementation of the ALS factorization algorithm that groups the two sets * of factors (referred to as "users" and "products") into blocks and reduces communication by only * sending one copy of each user vector to each product block on each iteration, and only for the @@ -70,11 +77,21 @@ case class Rating(val user: Int, val product: Int, val rating: Double) * vectors it receives from each user block it will depend on). This allows us to send only an * array of feature vectors between each user block and product block, and have the product block * find the users' ratings and update the products based on these messages. + * + * For implicit preference data, the algorithm used is based on + * "Collaborative Filtering for Implicit Feedback Datasets", available at + * [[http://research.yahoo.com/pub/2433]], adapted for the blocked approach used here. + * + * Essentially instead of finding the low-rank approximations to the rating matrix `R`, + * this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if r > 0 + * and 0 if r = 0. The ratings then act as 'confidence' values related to strength of indicated user + * preferences rather than explicit ratings given to items. */ -class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double) - extends Serializable +class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double, + var implicitPrefs: Boolean, var alpha: Double) + extends Serializable with Logging { - def this() = this(-1, 10, 10, 0.01) + def this() = this(-1, 10, 10, 0.01, false, 1.0) /** * Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured @@ -103,6 +120,16 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l this } + def setImplicitPrefs(implicitPrefs: Boolean): ALS = { + this.implicitPrefs = implicitPrefs + this + } + + def setAlpha(alpha: Double): ALS = { + this.alpha = alpha + this + } + /** * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. * Returns a MatrixFactorizationModel with feature vectors for each user and product. @@ -147,19 +174,24 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l } } - for (iter <- 0 until iterations) { + for (iter <- 1 to iterations) { // perform ALS update - products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda) - users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda) + logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) + // YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model + val YtY = computeYtY(users) + val YtYb = ratings.context.broadcast(YtY) + products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, + alpha, YtYb) + logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + val XtX = computeYtY(products) + val XtXb = ratings.context.broadcast(XtX) + users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, + alpha, XtXb) } // Flatten and cache the two final RDDs to un-block them - val usersOut = users.join(userOutLinks).flatMap { case (b, (factors, outLinkBlock)) => - for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) - } - val productsOut = products.join(productOutLinks).flatMap { case (b, (factors, outLinkBlock)) => - for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) - } + val usersOut = unblockFactors(users, userOutLinks) + val productsOut = unblockFactors(products, productOutLinks) usersOut.persist() productsOut.persist() @@ -168,6 +200,40 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l } /** + * Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors + * for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as + * the driver program requires `YtY` to broadcast it to the slaves + * @param factors the (block-distributed) user or product factor vectors + * @return Option[YtY] - whose value is only used in the implicit preference model + */ + def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { + if (implicitPrefs) { + Option( + factors.flatMapValues{ case factorArray => + factorArray.map{ vector => + val x = new DoubleMatrix(vector) + x.mmul(x.transpose()) + } + }.reduceByKeyLocally((a, b) => a.addi(b)) + .values + .reduce((a, b) => a.addi(b)) + ) + } else { + None + } + } + + /** + * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs + */ + def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])], + outLinks: RDD[(Int, OutLinkBlock)]) = { + blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) => + for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) + } + } + + /** * Make the out-links table for a block of the users (or products) dataset given the list of * (user, product, rating) values for the users in that block (or the opposite for products). */ @@ -251,7 +317,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l userInLinks: RDD[(Int, InLinkBlock)], partitioner: Partitioner, rank: Int, - lambda: Double) + lambda: Double, + alpha: Double, + YtY: Broadcast[Option[DoubleMatrix]]) : RDD[(Int, Array[Array[Double]])] = { val numBlocks = products.partitions.size @@ -265,7 +333,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) } }.groupByKey(partitioner) .join(userInLinks) - .mapValues{ case (messages, inLinkBlock) => updateBlock(messages, inLinkBlock, rank, lambda) } + .mapValues{ case (messages, inLinkBlock) => + updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY) + } } /** @@ -273,7 +343,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l * it received from each product and its InLinkBlock. */ def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, - rank: Int, lambda: Double) + rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]]) : Array[Array[Double]] = { // Sort the incoming block factor messages by block ID and make them an array @@ -298,8 +368,14 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l fillXtX(x, tempXtX) val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p) for (i <- 0 until us.length) { - userXtX(us(i)).addi(tempXtX) - SimpleBlas.axpy(rs(i), x, userXy(us(i))) + implicitPrefs match { + case false => + userXtX(us(i)).addi(tempXtX) + SimpleBlas.axpy(rs(i), x, userXy(us(i))) + case true => + userXtX(us(i)).addi(tempXtX.mul(alpha * rs(i))) + SimpleBlas.axpy(1 + alpha * rs(i), x, userXy(us(i))) + } } } } @@ -311,7 +387,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l // Add regularization (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda) // Solve the resulting matrix, which is symmetric and positive-definite - Solve.solvePositive(fullXtX, userXy(index)).data + implicitPrefs match { + case false => Solve.solvePositive(fullXtX, userXy(index)).data + case true => Solve.solvePositive(fullXtX.add(YtY.value.get), userXy(index)).data + } } } @@ -381,7 +460,7 @@ object ALS { blocks: Int) : MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda).run(ratings) + new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings) } /** @@ -419,6 +498,68 @@ object ALS { train(ratings, rank, iterations, 0.01, -1) } + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users + * to some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. This is done using + * a level of parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param alpha confidence parameter (only applies when immplicitPrefs = true) + */ + def trainImplicit( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + alpha: Double) + : MatrixFactorizationModel = + { + new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings) + } + + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users to + * some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. The level of + * parallelism is determined automatically based on the number of partitions in `ratings`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + */ + def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double) + : MatrixFactorizationModel = + { + trainImplicit(ratings, rank, iterations, lambda, -1, alpha) + } + + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' ratings given by + * users to some products, in the form of (userID, productID, rating) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. The level of + * parallelism is determined automatically based on the number of partitions in `ratings`. + * Model parameters `alpha` and `lambda` are set to reasonable default values + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + */ + def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int) + : MatrixFactorizationModel = + { + trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) + } + private class ALSRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Rating]) @@ -426,29 +567,37 @@ object ALS { } def main(args: Array[String]) { - if (args.length != 5 && args.length != 6) { - println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]") + if (args.length < 5 || args.length > 9) { + println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> " + + "[<lambda>] [<implicitPrefs>] [<alpha>] [<blocks>]") System.exit(1) } val (master, ratingsFile, rank, iters, outputDir) = (args(0), args(1), args(2).toInt, args(3).toInt, args(4)) - val blocks = if (args.length == 6) args(5).toInt else -1 + val lambda = if (args.length >= 6) args(5).toDouble else 0.01 + val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false + val alpha = if (args.length >= 8) args(7).toDouble else 1 + val blocks = if (args.length == 9) args(8).toInt else -1 + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName) System.setProperty("spark.kryo.referenceTracking", "false") System.setProperty("spark.kryoserializer.buffer.mb", "8") System.setProperty("spark.locality.wait", "10000") + val sc = new SparkContext(master, "ALS") val ratings = sc.textFile(ratingsFile).map { line => val fields = line.split(',') Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) } - val model = ALS.train(ratings, rank, iters, 0.01, blocks) + val model = new ALS(rank = rank, iterations = iters, lambda = lambda, + numBlocks = blocks, implicitPrefs = implicitPrefs, alpha = alpha).run(ratings) + model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") } .saveAsTextFile(outputDir + "/userFeatures") model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") } .saveAsTextFile(outputDir + "/productFeatures") println("Final user/product features written to " + outputDir) - System.exit(0) + sc.stop() } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index 3323f6cee2..eafee060cd 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.mllib.recommendation; import java.io.Serializable; import java.util.List; +import java.lang.Math; import scala.Tuple2; @@ -48,7 +49,7 @@ public class JavaALSSuite implements Serializable { } void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, - DoubleMatrix trueRatings, double matchThreshold) { + DoubleMatrix trueRatings, double matchThreshold, boolean implicitPrefs, DoubleMatrix truePrefs) { DoubleMatrix predictedU = new DoubleMatrix(users, features); List<scala.Tuple2<Object, double[]>> userFeatures = model.userFeatures().toJavaRDD().collect(); for (int i = 0; i < features; ++i) { @@ -68,12 +69,32 @@ public class JavaALSSuite implements Serializable { DoubleMatrix predictedRatings = predictedU.mmul(predictedP.transpose()); - for (int u = 0; u < users; ++u) { - for (int p = 0; p < products; ++p) { - double prediction = predictedRatings.get(u, p); - double correct = trueRatings.get(u, p); - Assert.assertTrue(Math.abs(prediction - correct) < matchThreshold); + if (!implicitPrefs) { + for (int u = 0; u < users; ++u) { + for (int p = 0; p < products; ++p) { + double prediction = predictedRatings.get(u, p); + double correct = trueRatings.get(u, p); + Assert.assertTrue(String.format("Prediction=%2.4f not below match threshold of %2.2f", + prediction, matchThreshold), Math.abs(prediction - correct) < matchThreshold); + } } + } else { + // For implicit prefs we use the confidence-weighted RMSE to test (ref Mahout's implicit ALS tests) + double sqErr = 0.0; + double denom = 0.0; + for (int u = 0; u < users; ++u) { + for (int p = 0; p < products; ++p) { + double prediction = predictedRatings.get(u, p); + double truePref = truePrefs.get(u, p); + double confidence = 1.0 + /* alpha = */ 1.0 * trueRatings.get(u, p); + double err = confidence * (truePref - prediction) * (truePref - prediction); + sqErr += err; + denom += 1.0; + } + } + double rmse = Math.sqrt(sqErr / denom); + Assert.assertTrue(String.format("Confidence-weighted RMSE=%2.4f above threshold of %2.2f", + rmse, matchThreshold), Math.abs(rmse) < matchThreshold); } } @@ -81,30 +102,62 @@ public class JavaALSSuite implements Serializable { public void runALSUsingStaticMethods() { int features = 1; int iterations = 15; - int users = 10; - int products = 10; - scala.Tuple2<List<Rating>, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7); + int users = 50; + int products = 100; + scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, false); JavaRDD<Rating> data = sc.parallelize(testData._1()); MatrixFactorizationModel model = ALS.train(data.rdd(), features, iterations); - validatePrediction(model, users, products, features, testData._2(), 0.3); + validatePrediction(model, users, products, features, testData._2(), 0.3, false, testData._3()); } @Test public void runALSUsingConstructor() { int features = 2; int iterations = 15; - int users = 20; - int products = 30; - scala.Tuple2<List<Rating>, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7); + int users = 100; + int products = 200; + scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, false); JavaRDD<Rating> data = sc.parallelize(testData._1()); MatrixFactorizationModel model = new ALS().setRank(features) .setIterations(iterations) .run(data.rdd()); - validatePrediction(model, users, products, features, testData._2(), 0.3); + validatePrediction(model, users, products, features, testData._2(), 0.3, false, testData._3()); + } + + @Test + public void runImplicitALSUsingStaticMethods() { + int features = 1; + int iterations = 15; + int users = 80; + int products = 160; + scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, true); + + JavaRDD<Rating> data = sc.parallelize(testData._1()); + MatrixFactorizationModel model = ALS.trainImplicit(data.rdd(), features, iterations); + validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); + } + + @Test + public void runImplicitALSUsingConstructor() { + int features = 2; + int iterations = 15; + int users = 100; + int products = 200; + scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, true); + + JavaRDD<Rating> data = sc.parallelize(testData._1()); + + MatrixFactorizationModel model = new ALS().setRank(features) + .setIterations(iterations) + .setImplicitPrefs(true) + .run(data.rdd()); + validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 347ef238f4..fafc5ec5f2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -34,16 +34,19 @@ object ALSSuite { users: Int, products: Int, features: Int, - samplingRate: Double): (java.util.List[Rating], DoubleMatrix) = { - val (sampledRatings, trueRatings) = generateRatings(users, products, features, samplingRate) - (seqAsJavaList(sampledRatings), trueRatings) + samplingRate: Double, + implicitPrefs: Boolean): (java.util.List[Rating], DoubleMatrix, DoubleMatrix) = { + val (sampledRatings, trueRatings, truePrefs) = + generateRatings(users, products, features, samplingRate, implicitPrefs) + (seqAsJavaList(sampledRatings), trueRatings, truePrefs) } def generateRatings( users: Int, products: Int, features: Int, - samplingRate: Double): (Seq[Rating], DoubleMatrix) = { + samplingRate: Double, + implicitPrefs: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = { val rand = new Random(42) // Create a random matrix with uniform values from -1 to 1 @@ -52,14 +55,20 @@ object ALSSuite { val userMatrix = randomMatrix(users, features) val productMatrix = randomMatrix(features, products) - val trueRatings = userMatrix.mmul(productMatrix) + val (trueRatings, truePrefs) = implicitPrefs match { + case true => + val raw = new DoubleMatrix(users, products, Array.fill(users * products)(rand.nextInt(10).toDouble): _*) + val prefs = new DoubleMatrix(users, products, raw.data.map(v => if (v > 0) 1.0 else 0.0): _*) + (raw, prefs) + case false => (userMatrix.mmul(productMatrix), null) + } val sampledRatings = { for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate) yield Rating(u, p, trueRatings.get(u, p)) } - (sampledRatings, trueRatings) + (sampledRatings, trueRatings, truePrefs) } } @@ -78,11 +87,19 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { } test("rank-1 matrices") { - testALS(10, 20, 1, 15, 0.7, 0.3) + testALS(50, 100, 1, 15, 0.7, 0.3) } test("rank-2 matrices") { - testALS(20, 30, 2, 15, 0.7, 0.3) + testALS(100, 200, 2, 15, 0.7, 0.3) + } + + test("rank-1 matrices implicit") { + testALS(80, 160, 1, 15, 0.7, 0.4, true) + } + + test("rank-2 matrices implicit") { + testALS(100, 200, 2, 15, 0.7, 0.4, true) } /** @@ -96,11 +113,14 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { * @param matchThreshold max difference allowed to consider a predicted rating correct */ def testALS(users: Int, products: Int, features: Int, iterations: Int, - samplingRate: Double, matchThreshold: Double) + samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false) { - val (sampledRatings, trueRatings) = ALSSuite.generateRatings(users, products, - features, samplingRate) - val model = ALS.train(sc.parallelize(sampledRatings), features, iterations) + val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, + features, samplingRate, implicitPrefs) + val model = implicitPrefs match { + case false => ALS.train(sc.parallelize(sampledRatings), features, iterations) + case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations) + } val predictedU = new DoubleMatrix(users, features) for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) { @@ -112,12 +132,31 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { } val predictedRatings = predictedU.mmul(predictedP.transpose) - for (u <- 0 until users; p <- 0 until products) { - val prediction = predictedRatings.get(u, p) - val correct = trueRatings.get(u, p) - if (math.abs(prediction - correct) > matchThreshold) { - fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( - u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP)) + if (!implicitPrefs) { + for (u <- 0 until users; p <- 0 until products) { + val prediction = predictedRatings.get(u, p) + val correct = trueRatings.get(u, p) + if (math.abs(prediction - correct) > matchThreshold) { + fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( + u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP)) + } + } + } else { + // For implicit prefs we use the confidence-weighted RMSE to test (ref Mahout's tests) + var sqErr = 0.0 + var denom = 0.0 + for (u <- 0 until users; p <- 0 until products) { + val prediction = predictedRatings.get(u, p) + val truePref = truePrefs.get(u, p) + val confidence = 1 + 1.0 * trueRatings.get(u, p) + val err = confidence * (truePref - prediction) * (truePref - prediction) + sqErr += err + denom += 1 + } + val rmse = math.sqrt(sqErr / denom) + if (math.abs(rmse) > matchThreshold) { + fail("Model failed to predict RMSE: %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( + rmse, truePrefs, predictedRatings, predictedU, predictedP)) } } } @@ -40,6 +40,7 @@ <connection>scm:git:git@github.com:apache/incubator-spark.git</connection> <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-spark.git</developerConnection> <url>scm:git:git@github.com:apache/incubator-spark.git</url> + <tag>HEAD</tag> </scm> <developers> <developer> @@ -322,7 +323,7 @@ <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> + <artifactId>scalatest_2.9.3</artifactId> <version>1.9.1</version> <scope>test</scope> </dependency> @@ -334,7 +335,7 @@ </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.version}</artifactId> + <artifactId>scalacheck_2.9.3</artifactId> <version>1.10.0</version> <scope>test</scope> </dependency> @@ -603,7 +604,7 @@ <junitxml>.</junitxml> <filereports>${project.build.directory}/SparkTestSuite.txt</filereports> <argLine>-Xms64m -Xmx3g</argLine> - <stderr/> + <stderr /> </configuration> <executions> <execution> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index aef246d8a9..eb4b96eb47 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -97,6 +97,9 @@ object SparkBuild extends Build { // Only allow one test at a time, even across projects, since they run in the same JVM concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), + // also check the local Maven repository ~/.m2 + resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), + // Shared between both core and streaming. resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), @@ -153,6 +156,7 @@ object SparkBuild extends Build { */ + libraryDependencies ++= Seq( "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", "org.scalatest" %% "scalatest" % "1.9.1" % "test", @@ -175,6 +179,7 @@ object SparkBuild extends Build { val slf4jVersion = "1.7.2" + val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject") val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson") val excludeNetty = ExclusionRule(organization = "org.jboss.netty") val excludeAsm = ExclusionRule(organization = "asm") @@ -207,7 +212,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.13.0", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "net.java.dev.jets3t" % "jets3t" % "0.7.1", "org.apache.avro" % "avro" % "1.7.4", "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), @@ -245,6 +250,7 @@ object SparkBuild extends Build { exclude("log4j","log4j") exclude("org.apache.cassandra.deps", "avro") excludeAll(excludeSnappy) + excludeAll(excludeCglib) ) ) ++ assemblySettings ++ extraAssemblySettings @@ -287,10 +293,10 @@ object SparkBuild extends Build { def yarnEnabledSettings = Seq( libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) ) ) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 58e1849cad..39c402b412 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -29,7 +29,7 @@ from threading import Thread from pyspark import cloudpickle from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \ - read_from_pickle_file + read_from_pickle_file, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -690,11 +690,13 @@ class RDD(object): # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. def add_shuffle_key(split, iterator): + buckets = defaultdict(list) + for (k, v) in iterator: buckets[partitionFunc(k) % numPartitions].append((k, v)) for (split, items) in buckets.iteritems(): - yield str(split) + yield pack_long(split) yield dump_pickle(Batch(items)) keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True @@ -831,8 +833,8 @@ class RDD(object): >>> sorted(x.subtractByKey(y).collect()) [('b', 4), ('b', 5)] """ - filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0 - map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]] + filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 + map_func = lambda (key, vals): [(key, val) for val in vals[0]] return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) def subtract(self, other, numPartitions=None): diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index fecacd1241..54fed1c9c7 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -67,6 +67,10 @@ def write_long(value, stream): stream.write(struct.pack("!q", value)) +def pack_long(value): + return struct.pack("!q", value) + + def read_int(stream): length = stream.read(4) if length == "": diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 05aadc7bdf..f6bf94be6b 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl-bin</artifactId> + <artifactId>spark-repl-bin_2.9.3</artifactId> <packaging>pom</packaging> <name>Spark Project REPL binary packaging</name> <url>http://spark.incubator.apache.org/</url> @@ -40,18 +40,18 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_2.9.3</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl</artifactId> + <artifactId>spark-repl_2.9.3</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> @@ -89,7 +89,7 @@ </goals> <configuration> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> diff --git a/repl/pom.xml b/repl/pom.xml index 2826c0743c..49d86621dd 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl</artifactId> + <artifactId>spark-repl_2.9.3</artifactId> <packaging>jar</packaging> <name>Spark Project REPL</name> <url>http://spark.incubator.apache.org/</url> @@ -39,18 +39,18 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_2.9.3</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib</artifactId> + <artifactId>spark-mllib_2.9.3</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> @@ -76,12 +76,12 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> + <artifactId>scalatest_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.version}</artifactId> + <artifactId>scalacheck_2.9.3</artifactId> <scope>test</scope> </dependency> </dependencies> @@ -101,14 +101,14 @@ <configuration> <exportAntProperties>true</exportAntProperties> <tasks> - <property name="spark.classpath" refid="maven.test.classpath"/> - <property environment="env"/> + <property name="spark.classpath" refid="maven.test.classpath" /> + <property environment="env" /> <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> <condition> <not> <or> - <isset property="env.SCALA_HOME"/> - <isset property="env.SCALA_LIBRARY_PATH"/> + <isset property="env.SCALA_HOME" /> + <isset property="env.SCALA_LIBRARY_PATH" /> </or> </not> </condition> diff --git a/streaming/pom.xml b/streaming/pom.xml index b260a72abb..3b25fb49fb 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-streaming_2.9.3</artifactId> <packaging>jar</packaging> <name>Spark Project Streaming</name> <url>http://spark.incubator.apache.org/</url> @@ -42,7 +42,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -58,6 +58,7 @@ <groupId>org.apache.kafka</groupId> <artifactId>kafka</artifactId> <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flume</groupId> @@ -91,12 +92,12 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> + <artifactId>scalatest_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.version}</artifactId> + <artifactId>scalacheck_2.9.3</artifactId> <scope>test</scope> </dependency> <dependency> diff --git a/tools/pom.xml b/tools/pom.xml index 29f0014128..f1c489beea 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -25,7 +25,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-tools</artifactId> + <artifactId>spark-tools_2.9.3</artifactId> <packaging>jar</packaging> <name>Spark Project Tools</name> <url>http://spark.incubator.apache.org/</url> @@ -33,17 +33,17 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-streaming_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> + <artifactId>scalatest_2.9.3</artifactId> <scope>test</scope> </dependency> </dependencies> diff --git a/yarn/pom.xml b/yarn/pom.xml index 427fcdf545..3bc619df07 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -25,7 +25,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn</artifactId> + <artifactId>spark-yarn_2.9.3</artifactId> <packaging>jar</packaging> <name>Spark Project YARN Support</name> <url>http://spark.incubator.apache.org/</url> @@ -33,7 +33,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_2.9.3</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -97,7 +97,7 @@ </goals> <configuration> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3362010106..076dd3c9b0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -106,7 +106,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl logInfo("Setting up application submission context for ASM") val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) - appContext.setApplicationName("Spark") + appContext.setApplicationName(args.appName) return appContext } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index cd651904d2..c56dbd99ba 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -32,6 +32,7 @@ class ClientArguments(val args: Array[String]) { var numWorkers = 2 var amQueue = System.getProperty("QUEUE", "default") var amMemory: Int = 512 + var appName: String = "Spark" // TODO var inputFormatInfo: List[InputFormatInfo] = null @@ -78,6 +79,10 @@ class ClientArguments(val args: Array[String]) { amQueue = value args = tail + case ("--name") :: value :: tail => + appName = value + args = tail + case Nil => if (userJar == null || userClass == null) { printUsageAndExit(1) @@ -108,6 +113,7 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + + " --name NAME The name of your application (Default: Spark)\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" ) System.exit(exitCode) |