diff options
Diffstat (limited to 'core')
109 files changed, 738 insertions, 627 deletions
diff --git a/core/pom.xml b/core/pom.xml index 38f4be1280..043f6cf68d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.9.3</artifactId> + <artifactId>spark-core_2.10</artifactId> <packaging>jar</packaging> <name>Spark Project Core</name> <url>http://spark.incubator.apache.org/</url> @@ -81,12 +81,8 @@ <artifactId>asm</artifactId> </dependency> <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </dependency> - <dependency> <groupId>com.twitter</groupId> - <artifactId>chill_2.9.3</artifactId> + <artifactId>chill_${scala.binary.version}</artifactId> <version>0.3.1</version> </dependency> <dependency> @@ -96,19 +92,11 @@ </dependency> <dependency> <groupId>${akka.group}</groupId> - <artifactId>akka-actor</artifactId> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-remote</artifactId> + <artifactId>akka-remote_${scala.binary.version}</artifactId> </dependency> <dependency> <groupId>${akka.group}</groupId> - <artifactId>akka-slf4j</artifactId> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scalap</artifactId> + <artifactId>akka-slf4j_${scala.binary.version}</artifactId> </dependency> <dependency> <groupId>org.scala-lang</groupId> @@ -116,7 +104,7 @@ </dependency> <dependency> <groupId>net.liftweb</groupId> - <artifactId>lift-json_2.9.2</artifactId> + <artifactId>lift-json_${scala.binary.version}</artifactId> </dependency> <dependency> <groupId>it.unimi.dsi</groupId> @@ -164,13 +152,18 @@ <scope>test</scope> </dependency> <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.9.3</artifactId> + <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_2.9.3</artifactId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> @@ -190,8 +183,8 @@ </dependency> </dependencies> <build> - <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java index 20a7a3aa8c..edd0fc56f8 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java @@ -19,8 +19,6 @@ package org.apache.spark.network.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioSocketChannel; diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java index 666432474d..a99af348ce 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java @@ -20,7 +20,6 @@ package org.apache.spark.network.netty; import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.oio.OioEventLoopGroup; diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index b4d0b7017c..10fae5af9f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -21,12 +21,11 @@ import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.HashSet +import scala.concurrent.Await +import scala.concurrent.duration._ import akka.actor._ -import akka.dispatch._ import akka.pattern.ask -import akka.util.Duration - import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId @@ -55,9 +54,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster private[spark] class MapOutputTracker extends Logging { private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") - + // Set to the MapOutputTrackerActor living on the driver - var trackerActor: ActorRef = _ + var trackerActor: Either[ActorRef, ActorSelection] = _ protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]] @@ -73,8 +72,18 @@ private[spark] class MapOutputTracker extends Logging { // throw a SparkException if this fails. private def askTracker(message: Any): Any = { try { - val future = trackerActor.ask(message)(timeout) - return Await.result(future, timeout) + /* + The difference between ActorRef and ActorSelection is well explained here: + http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor + In spark a map output tracker can be either started on Driver where it is created which + is an ActorRef or it can be on executor from where it is looked up which is an + actorSelection. + */ + val future = trackerActor match { + case Left(a: ActorRef) => a.ask(message)(timeout) + case Right(b: ActorSelection) => b.ask(message)(timeout) + } + Await.result(future, timeout) } catch { case e: Exception => throw new SparkException("Error communicating with MapOutputTracker", e) @@ -117,7 +126,7 @@ private[spark] class MapOutputTracker extends Logging { fetching += shuffleId } } - + if (fetchedStatuses == null) { // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) @@ -144,7 +153,7 @@ private[spark] class MapOutputTracker extends Logging { else{ throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing all output locations for shuffle " + shuffleId)) - } + } } else { statuses.synchronized { return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses) @@ -312,7 +321,7 @@ private[spark] object MapOutputTracker { statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { assert (statuses != null) statuses.map { - status => + status => if (status == null) { throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing an output location for shuffle " + shuffleId)) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 0e2c987a59..bcec41c439 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -17,8 +17,10 @@ package org.apache.spark -import org.apache.spark.util.Utils +import scala.reflect.ClassTag + import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils /** * An object that defines how the elements in a key-value pair RDD are partitioned by key. @@ -72,7 +74,7 @@ class HashPartitioner(partitions: Int) extends Partitioner { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } - + override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions @@ -85,7 +87,7 @@ class HashPartitioner(partitions: Int) extends Partitioner { * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges. * Determines the ranges by sampling the RDD passed in. */ -class RangePartitioner[K <% Ordered[K]: ClassManifest, V]( +class RangePartitioner[K <% Ordered[K]: ClassTag, V]( partitions: Int, @transient rdd: RDD[_ <: Product2[K,V]], private val ascending: Boolean = true) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 66006bf212..a0f794edfd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -26,6 +26,7 @@ import scala.collection.Map import scala.collection.generic.Growable import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap +import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -267,19 +268,19 @@ class SparkContext( // Methods for creating RDDs /** Distribute a local Scala collection to form an RDD. */ - def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { + def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } /** Distribute a local Scala collection to form an RDD. */ - def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { + def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { parallelize(seq, numSlices) } /** Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. */ - def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = { + def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = { val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) } @@ -332,7 +333,7 @@ class SparkContext( } /** - * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys, * values and the InputFormat so that users don't need to pass them directly. Instead, callers * can just write, for example, * {{{ @@ -340,17 +341,17 @@ class SparkContext( * }}} */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) - (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]) + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) : RDD[(K, V)] = { hadoopFile(path, - fm.erasure.asInstanceOf[Class[F]], - km.erasure.asInstanceOf[Class[K]], - vm.erasure.asInstanceOf[Class[V]], + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], minSplits) } /** - * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys, * values and the InputFormat so that users don't need to pass them directly. Instead, callers * can just write, for example, * {{{ @@ -358,17 +359,17 @@ class SparkContext( * }}} */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) - (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = hadoopFile[K, V, F](path, defaultMinSplits) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String) - (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = { + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { newAPIHadoopFile( path, - fm.erasure.asInstanceOf[Class[F]], - km.erasure.asInstanceOf[Class[K]], - vm.erasure.asInstanceOf[Class[V]]) + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]]) } /** @@ -426,11 +427,11 @@ class SparkContext( * IntWritable). The most natural thing would've been to have implicit objects for the * converters, but then we couldn't have an object for every subclass of Writable (you can't * have a parameterized singleton object). We use functions instead to create a new converter - * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to + * for the appropriate type. In addition, we pass the converter a ClassTag of its type to * allow it to figure out the Writable class to use in the subclass case. */ def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits) - (implicit km: ClassManifest[K], vm: ClassManifest[V], + (implicit km: ClassTag[K], vm: ClassTag[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { val kc = kcf() @@ -449,7 +450,7 @@ class SparkContext( * slow if you use the default serializer (Java serialization), though the nice thing about it is * that there's very little effort required to save arbitrary objects. */ - def objectFile[T: ClassManifest]( + def objectFile[T: ClassTag]( path: String, minSplits: Int = defaultMinSplits ): RDD[T] = { @@ -458,17 +459,17 @@ class SparkContext( } - protected[spark] def checkpointFile[T: ClassManifest]( + protected[spark] def checkpointFile[T: ClassTag]( path: String ): RDD[T] = { new CheckpointRDD[T](this, path) } /** Build the union of a list of RDDs. */ - def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) + def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) /** Build the union of a list of RDDs passed as variable-length arguments. */ - def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] = + def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = new UnionRDD(this, Seq(first) ++ rest) // Methods for creating shared variables @@ -711,7 +712,7 @@ class SparkContext( * flag specifies whether the scheduler can run the computation on the driver rather than * shipping it out to the cluster, for short actions like first(). */ - def runJob[T, U: ClassManifest]( + def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], @@ -732,7 +733,7 @@ class SparkContext( * allowLocal flag specifies whether the scheduler can run the computation on the driver rather * than shipping it out to the cluster, for short actions like first(). */ - def runJob[T, U: ClassManifest]( + def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], @@ -747,7 +748,7 @@ class SparkContext( * Run a job on a given set of partitions of an RDD, but take a function of type * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. */ - def runJob[T, U: ClassManifest]( + def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int], @@ -759,21 +760,21 @@ class SparkContext( /** * Run a job on all partitions in an RDD and return the results in an array. */ - def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { + def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.size, false) } /** * Run a job on all partitions in an RDD and return the results in an array. */ - def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { + def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.size, false) } /** * Run a job on all partitions in an RDD and pass the results to a handler function. */ - def runJob[T, U: ClassManifest]( + def runJob[T, U: ClassTag]( rdd: RDD[T], processPartition: (TaskContext, Iterator[T]) => U, resultHandler: (Int, U) => Unit) @@ -784,7 +785,7 @@ class SparkContext( /** * Run a job on all partitions in an RDD and pass the results to a handler function. */ - def runJob[T, U: ClassManifest]( + def runJob[T, U: ClassTag]( rdd: RDD[T], processPartition: Iterator[T] => U, resultHandler: (Int, U) => Unit) @@ -930,16 +931,16 @@ object SparkContext { // TODO: Add AccumulatorParams for other types, e.g. lists and strings - implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = + implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = new PairRDDFunctions(rdd) - implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new AsyncRDDActions(rdd) + implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) - implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest]( + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) - implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( + implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag]( rdd: RDD[(K, V)]) = new OrderedRDDFunctions[K, V, (K, V)](rdd) @@ -964,16 +965,16 @@ object SparkContext { implicit def stringToText(s: String) = new Text(s) - private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = { + private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = { def anyToWritable[U <% Writable](u: U): Writable = u - new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], + new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray) } // Helper objects for converting common types to Writable - private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = { - val wClass = classManifest[W].erasure.asInstanceOf[Class[W]] + private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = { + val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) } @@ -992,7 +993,7 @@ object SparkContext { implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString) implicit def writableWritableConverter[T <: Writable]() = - new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T]) + new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) /** * Find the JAR from which a given class was loaded, to make it easy for users to pass @@ -1147,12 +1148,12 @@ object SparkContext { /** * A class encapsulating how to convert some type T to Writable. It stores both the Writable class * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion. - * The getter for the writable class takes a ClassManifest[T] in case this is a generic object + * The getter for the writable class takes a ClassTag[T] in case this is a generic object * that doesn't know the type of T when it is created. This sounds strange but is necessary to * support converting subclasses of Writable to themselves (writableWritableConverter). */ private[spark] class WritableConverter[T]( - val writableClass: ClassManifest[T] => Class[_ <: Writable], + val writableClass: ClassTag[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ff2df8fb6a..826f5c2d8c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -20,7 +20,7 @@ package org.apache.spark import collection.mutable import serializer.Serializer -import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem} +import akka.actor._ import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager @@ -74,7 +74,8 @@ class SparkEnv ( actorSystem.shutdown() // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // down, but let's call it anyway in case it gets fixed in a later release - actorSystem.awaitTermination() + // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. + //actorSystem.awaitTermination() } def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { @@ -151,17 +152,17 @@ object SparkEnv extends Logging { val closureSerializer = serializerManager.get( System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")) - def registerOrLookup(name: String, newActor: => Actor): ActorRef = { + def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = { if (isDriver) { logInfo("Registering " + name) - actorSystem.actorOf(Props(newActor), name = name) + Left(actorSystem.actorOf(Props(newActor), name = name)) } else { val driverHost: String = System.getProperty("spark.driver.host", "localhost") val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt Utils.checkHost(driverHost, "Expected hostname") - val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name) + val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name) logInfo("Connecting to " + name + ": " + url) - actorSystem.actorFor(url) + Right(actorSystem.actorSelection(url)) } } diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala index 19ce8369d9..0bf1e4a5e2 100644 --- a/core/src/main/scala/org/apache/spark/TaskState.scala +++ b/core/src/main/scala/org/apache/spark/TaskState.scala @@ -19,8 +19,7 @@ package org.apache.spark import org.apache.mesos.Protos.{TaskState => MesosTaskState} -private[spark] object TaskState - extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED", "KILLED", "LOST") { +private[spark] object TaskState extends Enumeration { val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index 9f02a9b7d3..da30cf619a 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -17,12 +17,15 @@ package org.apache.spark.api.java +import scala.reflect.ClassTag + import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.util.StatCounter import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.storage.StorageLevel + import java.lang.Double import org.apache.spark.Partitioner @@ -30,7 +33,7 @@ import scala.collection.JavaConverters._ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] { - override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]] + override val classTag: ClassTag[Double] = implicitly[ClassTag[Double]] override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x)) @@ -44,7 +47,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaDoubleRDD = fromRDD(srdd.cache()) - /** + /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. Can only be called once on each RDD. */ @@ -108,7 +111,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 2142fd7327..363667fa86 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -22,6 +22,7 @@ import java.util.Comparator import scala.Tuple2 import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec @@ -43,13 +44,13 @@ import org.apache.spark.rdd.OrderedRDDFunctions import org.apache.spark.storage.StorageLevel -class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K], - implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { +class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K], + implicit val vClassTag: ClassTag[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) - override val classManifest: ClassManifest[(K, V)] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + override val classTag: ClassTag[(K, V)] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] import JavaPairRDD._ @@ -58,7 +59,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache()) - /** + /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. Can only be called once on each RDD. */ @@ -138,14 +139,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif override def first(): (K, V) = rdd.first() // Pair RDD functions - + /** - * Generic function to combine the elements for each key using a custom set of aggregation - * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a - * "combined type" C * Note that V and C can be different -- for example, one might group an - * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three + * Generic function to combine the elements for each key using a custom set of aggregation + * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a + * "combined type" C * Note that V and C can be different -- for example, one might group an + * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three * functions: - * + * * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C's into a single one. @@ -157,8 +158,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairRDD[K, C] = { - implicit val cm: ClassManifest[C] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] fromRDD(rdd.combineByKey( createCombiner, mergeValue, @@ -195,14 +195,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** Count the number of elements for each key, and return the result to the master as a Map. */ def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey()) - /** + /** * (Experimental) Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] = rdd.countByKeyApprox(timeout).map(mapAsJavaMap) - /** + /** * (Experimental) Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ @@ -258,7 +258,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ @@ -315,15 +315,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } - /** + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing * partitioner/parallelism level. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = { - implicit val cm: ClassManifest[C] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd))) } @@ -414,8 +413,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * this also retains the original RDD's partitioning. */ def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] fromRDD(rdd.mapValues(f)) } @@ -426,8 +424,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = { import scala.collection.JavaConverters._ def fn = (x: V) => f.apply(x).asScala - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] fromRDD(rdd.flatMapValues(fn)) } @@ -592,6 +589,20 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif } /** + * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling + * `collect` or `save` on the resulting RDD will return or output an ordered list of records + * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in + * order of the keys). + */ + def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = { + class KeyOrdering(val a: K) extends Ordered[K] { + override def compare(b: K) = comp.compare(a, b) + } + implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x) + fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) + } + + /** * Return an RDD with the keys of each tuple. */ def keys(): JavaRDD[K] = JavaRDD.fromRDD[K](rdd.map(_._1)) @@ -603,22 +614,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif } object JavaPairRDD { - def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassManifest[K], - vcm: ClassManifest[T]): RDD[(K, JList[T])] = + def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassTag[K], + vcm: ClassTag[T]): RDD[(K, JList[T])] = rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _) - def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassManifest[K], - vcm: ClassManifest[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd).mapValues((x: (Seq[V], - Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2))) + def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassTag[K], + vcm: ClassTag[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd) + .mapValues((x: (Seq[V], Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2))) def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1], - Seq[W2]))])(implicit kcm: ClassManifest[K]) : RDD[(K, (JList[V], JList[W1], + Seq[W2]))])(implicit kcm: ClassTag[K]) : RDD[(K, (JList[V], JList[W1], JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues( (x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3))) - def fromRDD[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = + def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd) implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd @@ -626,10 +637,8 @@ object JavaPairRDD { /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */ def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] new JavaPairRDD[K, V](rdd.rdd) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 3b359a8fd6..c47657f512 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -17,12 +17,14 @@ package org.apache.spark.api.java +import scala.reflect.ClassTag + import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel -class JavaRDD[T](val rdd: RDD[T])(implicit val classManifest: ClassManifest[T]) extends +class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends JavaRDDLike[T, JavaRDD[T]] { override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) @@ -127,8 +129,7 @@ JavaRDDLike[T, JavaRDD[T]] { object JavaRDD { - implicit def fromRDD[T: ClassManifest](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd) + implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd) implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd } - diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 7a3568c5ef..9e912d3adb 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -20,6 +20,7 @@ package org.apache.spark.api.java import java.util.{List => JList, Comparator} import scala.Tuple2 import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec @@ -35,7 +36,7 @@ import org.apache.spark.storage.StorageLevel trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def wrapRDD(rdd: RDD[T]): This - implicit val classManifest: ClassManifest[T] + implicit val classTag: ClassTag[T] def rdd: RDD[T] @@ -71,7 +72,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R: ClassManifest]( + def mapPartitionsWithIndex[R: ClassTag]( f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), @@ -87,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to all elements of this RDD. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType()) } @@ -118,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType()) } @@ -158,18 +159,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * elements (a, b) where a is in `this` and b is in `other`. */ def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = - JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest, - other.classManifest) + JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classTag))(classTag, other.classTag) /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = { - implicit val kcm: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val vcm: ClassManifest[JList[T]] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]] + implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val vcm: ClassTag[JList[T]] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]] JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm) } @@ -178,10 +177,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * mapping to that key. */ def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = { - implicit val kcm: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val vcm: ClassManifest[JList[T]] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]] + implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val vcm: ClassTag[JList[T]] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]] JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm) } @@ -209,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * a map on the other). */ def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = { - JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest) + JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, other.classTag) } /** @@ -224,7 +222,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator( f.apply(asJavaIterator(x), asJavaIterator(y)).iterator()) JavaRDD.fromRDD( - rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType()) + rdd.zipPartitions(other.rdd)(fn)(other.classTag, f.elementType()))(f.elementType()) } // Actions (launch a job to return a value to the user program) @@ -356,7 +354,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = { - implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] JavaPairRDD.fromRDD(rdd.keyBy(f)) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 8869e072bf..acf328aa6a 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -21,6 +21,7 @@ import java.util.{Map => JMap} import scala.collection.JavaConversions import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.InputFormat @@ -82,8 +83,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices) } @@ -94,10 +94,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** Distribute a local Scala collection to form an RDD. */ def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int) : JavaPairRDD[K, V] = { - implicit val kcm: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val vcm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val vcm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)) } @@ -132,16 +130,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork valueClass: Class[V], minSplits: Int ): JavaPairRDD[K, V] = { - implicit val kcm = ClassManifest.fromClass(keyClass) - implicit val vcm = ClassManifest.fromClass(valueClass) + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits)) } /**Get an RDD for a Hadoop SequenceFile. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): JavaPairRDD[K, V] = { - implicit val kcm = ClassManifest.fromClass(keyClass) - implicit val vcm = ClassManifest.fromClass(valueClass) + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass)) } @@ -153,8 +151,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * that there's very little effort required to save arbitrary objects. */ def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] sc.objectFile(path, minSplits)(cm) } @@ -166,8 +163,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * that there's very little effort required to save arbitrary objects. */ def objectFile[T](path: String): JavaRDD[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] sc.objectFile(path)(cm) } @@ -183,8 +179,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork valueClass: Class[V], minSplits: Int ): JavaPairRDD[K, V] = { - implicit val kcm = ClassManifest.fromClass(keyClass) - implicit val vcm = ClassManifest.fromClass(valueClass) + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits)) } @@ -199,8 +195,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork keyClass: Class[K], valueClass: Class[V] ): JavaPairRDD[K, V] = { - implicit val kcm = ClassManifest.fromClass(keyClass) - implicit val vcm = ClassManifest.fromClass(valueClass) + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)) } @@ -212,8 +208,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork valueClass: Class[V], minSplits: Int ): JavaPairRDD[K, V] = { - implicit val kcm = ClassManifest.fromClass(keyClass) - implicit val vcm = ClassManifest.fromClass(valueClass) + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)) } @@ -224,8 +220,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork keyClass: Class[K], valueClass: Class[V] ): JavaPairRDD[K, V] = { - implicit val kcm = ClassManifest.fromClass(keyClass) - implicit val vcm = ClassManifest.fromClass(valueClass) + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass)) } @@ -240,8 +236,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork kClass: Class[K], vClass: Class[V], conf: Configuration): JavaPairRDD[K, V] = { - implicit val kcm = ClassManifest.fromClass(kClass) - implicit val vcm = ClassManifest.fromClass(vClass) + implicit val kcm: ClassTag[K] = ClassTag(kClass) + implicit val vcm: ClassTag[V] = ClassTag(vClass) new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)) } @@ -254,15 +250,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork fClass: Class[F], kClass: Class[K], vClass: Class[V]): JavaPairRDD[K, V] = { - implicit val kcm = ClassManifest.fromClass(kClass) - implicit val vcm = ClassManifest.fromClass(vClass) + implicit val kcm: ClassTag[K] = ClassTag(kClass) + implicit val vcm: ClassTag[V] = ClassTag(vClass) new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)) } /** Build the union of two or more RDDs. */ override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) - implicit val cm: ClassManifest[T] = first.classManifest + implicit val cm: ClassTag[T] = first.classTag sc.union(rdds)(cm) } @@ -270,9 +266,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) : JavaPairRDD[K, V] = { val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) - implicit val cm: ClassManifest[(K, V)] = first.classManifest - implicit val kcm: ClassManifest[K] = first.kManifest - implicit val vcm: ClassManifest[V] = first.vManifest + implicit val cm: ClassTag[(K, V)] = first.classTag + implicit val kcm: ClassTag[K] = first.kClassTag + implicit val vcm: ClassTag[V] = first.vClassTag new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm) } @@ -405,8 +401,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork } protected def checkpointFile[T](path: String): JavaRDD[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] new JavaRDD(sc.checkpointFile(path)) } } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java index c9cbce5624..2090efd3b9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java @@ -17,7 +17,6 @@ package org.apache.spark.api.java; -import java.util.Arrays; import java.util.ArrayList; import java.util.List; diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala index 2dfda8b09a..bdb01f7670 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala +++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala @@ -17,9 +17,11 @@ package org.apache.spark.api.java.function +import scala.reflect.ClassTag + /** * A function that returns zero or more output records from each input record. */ abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] { - def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]] + def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]] } diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala index 528e1c0a7c..aae1349c5e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala +++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala @@ -17,9 +17,11 @@ package org.apache.spark.api.java.function +import scala.reflect.ClassTag + /** * A function that takes two inputs and returns zero or more output records. */ abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] { - def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]] + def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]] } diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java index ce368ee01b..537439ef53 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java @@ -17,8 +17,8 @@ package org.apache.spark.api.java.function; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import java.io.Serializable; @@ -29,8 +29,8 @@ import java.io.Serializable; * when mapping RDDs of other types. */ public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable { - public ClassManifest<R> returnType() { - return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<R> returnType() { + return ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java index 44ad559d48..a2d1214fb4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java @@ -17,8 +17,8 @@ package org.apache.spark.api.java.function; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import java.io.Serializable; @@ -28,8 +28,8 @@ import java.io.Serializable; public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R> implements Serializable { - public ClassManifest<R> returnType() { - return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<R> returnType() { + return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java index ac6178924a..fb1deceab5 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java @@ -17,8 +17,8 @@ package org.apache.spark.api.java.function; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction2; import java.io.Serializable; @@ -29,8 +29,8 @@ import java.io.Serializable; public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R> implements Serializable { - public ClassManifest<R> returnType() { - return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<R> returnType() { + return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java index 6d76a8f970..ca485b3cc2 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java @@ -18,8 +18,8 @@ package org.apache.spark.api.java.function; import scala.Tuple2; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import java.io.Serializable; @@ -33,11 +33,11 @@ public abstract class PairFlatMapFunction<T, K, V> extends WrappedFunction1<T, Iterable<Tuple2<K, V>>> implements Serializable { - public ClassManifest<K> keyType() { - return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<K> keyType() { + return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class); } - public ClassManifest<V> valueType() { - return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<V> valueType() { + return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java index ede7ceefb5..cbe2306026 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java @@ -18,8 +18,8 @@ package org.apache.spark.api.java.function; import scala.Tuple2; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import java.io.Serializable; @@ -31,11 +31,11 @@ import java.io.Serializable; public abstract class PairFunction<T, K, V> extends WrappedFunction1<T, Tuple2<K, V>> implements Serializable { - public ClassManifest<K> keyType() { - return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<K> keyType() { + return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class); } - public ClassManifest<V> valueType() { - return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<V> valueType() { + return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 132e4fb0d2..a659cc06c2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -22,6 +22,7 @@ import java.net._ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast @@ -29,8 +30,7 @@ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils - -private[spark] class PythonRDD[T: ClassManifest]( +private[spark] class PythonRDD[T: ClassTag]( parent: RDD[T], command: Array[Byte], envVars: JMap[String, String], @@ -148,7 +148,7 @@ private[spark] class PythonRDD[T: ClassManifest]( case eof: EOFException => { throw new SparkException("Python worker exited unexpectedly (crashed)", eof) } - case e => throw e + case e: Throwable => throw e } } @@ -200,7 +200,7 @@ private[spark] object PythonRDD { } } catch { case eof: EOFException => {} - case e => throw e + case e: Throwable => throw e } JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } @@ -236,7 +236,7 @@ private[spark] object PythonRDD { } def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = { - implicit val cm : ClassManifest[T] = rdd.elementClassManifest + implicit val cm : ClassTag[T] = rdd.elementClassTag rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 67d45723ba..f291266fcf 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -64,7 +64,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String startDaemon() new Socket(daemonHost, daemonPort) } - case e => throw e + case e: Throwable => throw e } } } @@ -198,7 +198,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } }.start() } catch { - case e => { + case e: Throwable => { stopDaemon() throw e } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala index fcfea96ad6..37dfa7fec0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy -private[spark] object ExecutorState - extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") { +private[spark] object ExecutorState extends Enumeration { val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index a724900943..59d12a3e6f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -34,11 +34,11 @@ import scala.collection.mutable.ArrayBuffer */ private[spark] class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging { - + private val localHostname = Utils.localHostName() private val masterActorSystems = ArrayBuffer[ActorSystem]() private val workerActorSystems = ArrayBuffer[ActorSystem]() - + def start(): Array[String] = { logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") @@ -61,10 +61,13 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I def stop() { logInfo("Shutting down local Spark cluster.") // Stop the workers before the master so they don't get upset that it disconnected + // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors! + // This is unfortunate, but for now we just comment it out. workerActorSystems.foreach(_.shutdown()) - workerActorSystems.foreach(_.awaitTermination()) - + //workerActorSystems.foreach(_.awaitTermination()) masterActorSystems.foreach(_.shutdown()) - masterActorSystems.foreach(_.awaitTermination()) + //masterActorSystems.foreach(_.awaitTermination()) + masterActorSystems.clear() + workerActorSystems.clear() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 77422f61ec..4d95efa73a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -19,17 +19,15 @@ package org.apache.spark.deploy.client import java.util.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.concurrent.Await + import akka.actor._ -import akka.actor.Terminated +import akka.pattern.AskTimeoutException import akka.pattern.ask -import akka.util.Duration -import akka.util.duration._ -import akka.remote.RemoteClientDisconnected -import akka.remote.RemoteClientLifeCycleEvent -import akka.remote.RemoteClientShutdown -import akka.dispatch.Await - -import org.apache.spark.Logging +import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent} + +import org.apache.spark.{SparkException, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -51,18 +49,19 @@ private[spark] class Client( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 + var masterAddress: Address = null var actor: ActorRef = null var appId: String = null var registered = false var activeMasterUrl: String = null class ClientActor extends Actor with Logging { - var master: ActorRef = null - var masterAddress: Address = null + var master: ActorSelection = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times override def preStart() { + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) try { registerWithMaster() } catch { @@ -76,7 +75,7 @@ private[spark] class Client( def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") - val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) actor ! RegisterApplication(appDescription) } } @@ -84,6 +83,7 @@ private[spark] class Client( def registerWithMaster() { tryRegisterAllMasters() + import context.dispatcher var retries = 0 lazy val retryTimer: Cancellable = context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { @@ -102,10 +102,13 @@ private[spark] class Client( def changeMaster(url: String) { activeMasterUrl = url - master = context.actorFor(Master.toAkkaUrl(url)) - masterAddress = master.path.address - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + masterAddress = activeMasterUrl match { + case Master.sparkUrlRegex(host, port) => + Address("akka.tcp", Master.systemName, host, port.toInt) + case x => + throw new SparkException("Invalid spark URL: " + x) + } } override def receive = { @@ -135,21 +138,12 @@ private[spark] class Client( case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) - context.unwatch(master) changeMaster(masterUrl) alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) - case Terminated(actor_) if actor_ == master => - logWarning("Connection to master failed; waiting for master to reconnect...") - markDisconnected() - - case RemoteClientDisconnected(transport, address) if address == masterAddress => - logWarning("Connection to master failed; waiting for master to reconnect...") - markDisconnected() - - case RemoteClientShutdown(transport, address) if address == masterAddress => - logWarning("Connection to master failed; waiting for master to reconnect...") + case DisassociatedEvent(_, address, _) if address == masterAddress => + logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() case StopClient => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala index fedf879eff..67e6c5d66a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy.master -private[spark] object ApplicationState - extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED", "UNKNOWN") { +private[spark] object ApplicationState extends Enumeration { type ApplicationState = Value diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index c0849ef324..043945a211 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -65,7 +65,7 @@ private[spark] class FileSystemPersistenceEngine( (apps, workers) } - private def serializeIntoFile(file: File, value: Serializable) { + private def serializeIntoFile(file: File, value: AnyRef) { val created = file.createNewFile() if (!created) { throw new IllegalStateException("Could not create file: " + file) } @@ -77,13 +77,13 @@ private[spark] class FileSystemPersistenceEngine( out.close() } - def deserializeFromFile[T <: Serializable](file: File)(implicit m: Manifest[T]): T = { + def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = { val fileData = new Array[Byte](file.length().asInstanceOf[Int]) val dis = new DataInputStream(new FileInputStream(file)) dis.readFully(fileData) dis.close() - val clazz = m.erasure.asInstanceOf[Class[T]] + val clazz = m.runtimeClass.asInstanceOf[Class[T]] val serializer = serialization.serializerFor(clazz) serializer.fromBinary(fileData).asInstanceOf[T] } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index cd916672ac..c627dd3806 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -17,19 +17,20 @@ package org.apache.spark.deploy.master -import java.util.Date import java.text.SimpleDateFormat +import java.util.concurrent.TimeUnit +import java.util.Date import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor._ -import akka.actor.Terminated -import akka.dispatch.Await import akka.pattern.ask -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} +import akka.remote._ import akka.serialization.SerializationExtension -import akka.util.duration._ -import akka.util.{Duration, Timeout} +import akka.util.Timeout import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} @@ -37,9 +38,11 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{Utils, AkkaUtils} private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { + import context.dispatcher + val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt @@ -93,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act override def preStart() { logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.start() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) @@ -113,13 +116,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act new BlackHolePersistenceEngine() } - leaderElectionAgent = context.actorOf(Props( - RECOVERY_MODE match { + leaderElectionAgent = RECOVERY_MODE match { case "ZOOKEEPER" => - new ZooKeeperLeaderElectionAgent(self, masterUrl) + context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl)) case _ => - new MonarchyLeaderAgent(self) - })) + context.actorOf(Props(classOf[MonarchyLeaderAgent], self)) + } } override def preRestart(reason: Throwable, message: Option[Any]) { @@ -142,9 +144,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act RecoveryState.ALIVE else RecoveryState.RECOVERING - logInfo("I have been elected leader! New state: " + state) - if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } @@ -156,7 +156,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act System.exit(0) } - case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { + case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { @@ -164,9 +164,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { - val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) + val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, + sender, workerWebUiPort, publicAddress) registerWorker(worker) - context.watch(sender) // This doesn't work with remote actors but helps for testing persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() @@ -181,7 +181,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val app = createApplication(description, sender) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) - context.watch(sender) // This doesn't work with remote actors but helps for testing persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) schedule() @@ -257,23 +256,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (canCompleteRecovery) { completeRecovery() } } - case Terminated(actor) => { - // The disconnected actor could've been either a worker or an app; remove whichever of - // those we have an entry for in the corresponding actor hashmap - actorToWorker.get(actor).foreach(removeWorker) - actorToApp.get(actor).foreach(finishApplication) - if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } - } - - case RemoteClientDisconnected(transport, address) => { - // The disconnected client could've been either a worker or an app; remove whichever it was - addressToWorker.get(address).foreach(removeWorker) - addressToApp.get(address).foreach(finishApplication) - if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } - } - - case RemoteClientShutdown(transport, address) => { + case DisassociatedEvent(_, address, _) => { // The disconnected client could've been either a worker or an app; remove whichever it was + logInfo(s"$address got disassociated, removing it.") addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } @@ -530,9 +515,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } private[spark] object Master { - private val systemName = "sparkMaster" + val systemName = "sparkMaster" private val actorName = "Master" - private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r + val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) @@ -540,11 +525,11 @@ private[spark] object Master { actorSystem.awaitTermination() } - /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ + /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ def toAkkaUrl(sparkUrl: String): String = { sparkUrl match { case sparkUrlRegex(host, port) => - "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName) + "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName) case _ => throw new SparkException("Invalid master URL: " + sparkUrl) } @@ -552,9 +537,9 @@ private[spark] object Master { def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) - val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName) - val timeoutDuration = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName) + val timeoutDuration: FiniteDuration = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS) implicit val timeout = Timeout(timeoutDuration) val respFuture = actor ? RequestWebUIPort // ask pattern val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse] diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala index b91be821f0..256a5a7c28 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala @@ -17,9 +17,7 @@ package org.apache.spark.deploy.master -private[spark] object RecoveryState - extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { - +private[spark] object RecoveryState extends Enumeration { type MasterState = Value val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala index c8d34f25e2..0b36ef6005 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala @@ -17,9 +17,7 @@ package org.apache.spark.deploy.master -private[spark] object WorkerState - extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED", "UNKNOWN") { - +private[spark] object WorkerState extends Enumeration { type WorkerState = Value val ALIVE, DEAD, DECOMMISSIONED, UNKNOWN = Value diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index a0233a7271..825344b3bb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -70,15 +70,15 @@ class ZooKeeperPersistenceEngine(serialization: Serialization) (apps, workers) } - private def serializeIntoFile(path: String, value: Serializable) { + private def serializeIntoFile(path: String, value: AnyRef) { val serializer = serialization.findSerializerFor(value) val serialized = serializer.toBinary(value) zk.create(path, serialized, CreateMode.PERSISTENT) } - def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = { + def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = { val fileData = zk.getData("/spark/master_status/" + filename) - val clazz = m.erasure.asInstanceOf[Class[T]] + val clazz = m.runtimeClass.asInstanceOf[Class[T]] val serializer = serialization.serializerFor(clazz) serializer.fromBinary(fileData).asInstanceOf[T] } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index f4e574d15d..3b983c19eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -19,9 +19,10 @@ package org.apache.spark.deploy.master.ui import scala.xml.Node -import akka.dispatch.Await import akka.pattern.ask -import akka.util.duration._ + +import scala.concurrent.Await +import scala.concurrent.duration._ import javax.servlet.http.HttpServletRequest diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index d7a57229b0..65e7a14e7a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import akka.dispatch.Await +import scala.concurrent.Await import akka.pattern.ask -import akka.util.duration._ +import scala.concurrent.duration._ import net.liftweb.json.JsonAST.JValue diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index f4df729e87..a211ce2b42 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.master.ui -import akka.util.Duration +import scala.concurrent.duration._ import javax.servlet.http.HttpServletRequest diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 216d9d44ac..87531b6719 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -17,23 +17,31 @@ package org.apache.spark.deploy.worker +import java.io.File import java.text.SimpleDateFormat import java.util.Date -import java.io.File import scala.collection.mutable.HashMap +import scala.concurrent.duration._ import akka.actor._ -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} -import akka.util.duration._ +import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.spark.deploy.DeployMessages.WorkerStateResponse +import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed +import org.apache.spark.deploy.DeployMessages.KillExecutor +import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.deploy.DeployMessages.Heartbeat +import org.apache.spark.deploy.DeployMessages.RegisteredWorker +import org.apache.spark.deploy.DeployMessages.LaunchExecutor +import org.apache.spark.deploy.DeployMessages.RegisterWorker /** * @param masterUrls Each url should look like spark://host:port. @@ -47,6 +55,7 @@ private[spark] class Worker( masterUrls: Array[String], workDirPath: String = null) extends Actor with Logging { + import context.dispatcher Utils.checkHost(host, "Expected hostname") assert (port > 0) @@ -63,7 +72,8 @@ private[spark] class Worker( var masterIndex = 0 val masterLock: Object = new Object() - var master: ActorRef = null + var master: ActorSelection = null + var masterAddress: Address = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" @volatile var registered = false @@ -114,7 +124,7 @@ private[spark] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) - + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.start() registerWithMaster() @@ -126,9 +136,13 @@ private[spark] class Worker( masterLock.synchronized { activeMasterUrl = url activeMasterWebUiUrl = uiUrl - master = context.actorFor(Master.toAkkaUrl(activeMasterUrl)) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + masterAddress = activeMasterUrl match { + case Master.sparkUrlRegex(_host, _port) => + Address("akka.tcp", Master.systemName, _host, _port.toInt) + case x => + throw new SparkException("Invalid spark URL: " + x) + } connected = true } } @@ -136,7 +150,7 @@ private[spark] class Worker( def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") - val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress) } @@ -175,7 +189,6 @@ private[spark] class Worker( case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) - context.unwatch(master) changeMaster(masterUrl, masterWebUiUrl) val execs = executors.values. @@ -234,13 +247,8 @@ private[spark] class Worker( } } - case Terminated(actor_) if actor_ == master => - masterDisconnected() - - case RemoteClientDisconnected(transport, address) if address == master.path.address => - masterDisconnected() - - case RemoteClientShutdown(transport, address) if address == master.path.address => + case x: DisassociatedEvent if x.remoteAddress == masterAddress => + logInfo(s"$x Disassociated !") masterDisconnected() case RequestWorkerState => { @@ -280,8 +288,8 @@ private[spark] object Worker { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) - val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory, - masterUrls, workDir)), name = "Worker") + actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, + masterUrls, workDir), name = "Worker") (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index d2d3617498..1a768d501f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -21,9 +21,10 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import akka.dispatch.Await +import scala.concurrent.duration._ +import scala.concurrent.Await + import akka.pattern.ask -import akka.util.duration._ import net.liftweb.json.JsonAST.JValue diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 800f1cafcc..6c18a3c245 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -17,20 +17,19 @@ package org.apache.spark.deploy.worker.ui -import akka.util.{Duration, Timeout} +import java.io.File -import java.io.{FileInputStream, File} +import scala.concurrent.duration._ +import akka.util.Timeout import javax.servlet.http.HttpServletRequest -import org.eclipse.jetty.server.{Handler, Server} - +import org.apache.spark.Logging import org.apache.spark.deploy.worker.Worker -import org.apache.spark.{Logging} -import org.apache.spark.ui.JettyUtils +import org.apache.spark.ui.{JettyUtils, UIUtils} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils +import org.eclipse.jetty.server.{Handler, Server} /** * Web UI server for the standalone worker. diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8332631838..debbdd4c44 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,15 +19,14 @@ package org.apache.spark.executor import java.nio.ByteBuffer -import akka.actor.{ActorRef, Actor, Props, Terminated} -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} +import akka.actor._ +import akka.remote._ import org.apache.spark.Logging import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{Utils, AkkaUtils} - private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, @@ -40,14 +39,13 @@ private[spark] class CoarseGrainedExecutorBackend( Utils.checkHostPort(hostPort, "Expected hostport") var executor: Executor = null - var driver: ActorRef = null + var driver: ActorSelection = null override def preStart() { logInfo("Connecting to driver: " + driverUrl) - driver = context.actorFor(driverUrl) + driver = context.actorSelection(driverUrl) driver ! RegisterExecutor(executorId, hostPort, cores) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { @@ -77,8 +75,8 @@ private[spark] class CoarseGrainedExecutorBackend( executor.killTask(taskId) } - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => - logError("Driver terminated or disconnected! Shutting down.") + case x: DisassociatedEvent => + logError(s"Driver $x disassociated! Shutting down.") System.exit(1) case StopExecutor => @@ -99,12 +97,13 @@ private[spark] object CoarseGrainedExecutorBackend { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, + indestructible = true) // set it val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) - val actor = actorSystem.actorOf( - Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)), + actorSystem.actorOf( + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") actorSystem.awaitTermination() } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5c9bb9db1c..0b0a60ee60 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -121,7 +121,7 @@ private[spark] class Executor( // Akka's message frame size. If task result is bigger than this, we use the block manager // to send the result back. private val akkaFrameSize = { - env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size") + env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size") } // Start worker thread pool diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 9c2fee4023..703bc6a9ca 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -31,11 +31,11 @@ import scala.collection.mutable.SynchronizedMap import scala.collection.mutable.SynchronizedQueue import scala.collection.mutable.ArrayBuffer -import akka.dispatch.{Await, Promise, ExecutionContext, Future} -import akka.util.Duration -import akka.util.duration._ -import org.apache.spark.util.Utils +import scala.concurrent.{Await, Promise, ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.concurrent.duration._ +import org.apache.spark.util.Utils private[spark] class ConnectionManager(port: Int) extends Logging { diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala index 8d9ad9604d..4f5742d29b 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala @@ -25,8 +25,8 @@ import scala.io.Source import java.nio.ByteBuffer import java.net.InetAddress -import akka.dispatch.Await -import akka.util.duration._ +import scala.concurrent.Await +import scala.concurrent.duration._ private[spark] object ConnectionManagerTest extends Logging{ def main(args: Array[String]) { diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index faaf837be0..d1c74a5063 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global +import scala.reflect.ClassTag import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} @@ -28,7 +29,7 @@ import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} * A set of asynchronous RDD actions available through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. */ -class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with Logging { +class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging { /** * Returns a future for counting the number of elements in the RDD. diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 44ea573a7c..424354ae16 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.rdd +import scala.reflect.ClassTag + import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext} import org.apache.spark.storage.{BlockId, BlockManager} @@ -25,7 +27,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P } private[spark] -class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[BlockId]) +class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId]) extends RDD[T](sc, Nil) { @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 0de22f0e06..87b950ba43 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.rdd import java.io.{ObjectOutputStream, IOException} +import scala.reflect.ClassTag import org.apache.spark._ @@ -43,7 +44,7 @@ class CartesianPartition( } private[spark] -class CartesianRDD[T: ClassManifest, U:ClassManifest]( +class CartesianRDD[T: ClassTag, U: ClassTag]( sc: SparkContext, var rdd1 : RDD[T], var rdd2 : RDD[U]) diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index d3033ea4a6..a712ef1c27 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -17,15 +17,13 @@ package org.apache.spark.rdd +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.{NullWritable, BytesWritable} -import org.apache.hadoop.util.ReflectionUtils -import org.apache.hadoop.fs.Path -import java.io.{File, IOException, EOFException} -import java.text.NumberFormat private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -33,7 +31,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} * This RDD represents a RDD checkpoint file (similar to HadoopRDD). */ private[spark] -class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String) +class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil) { @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index c5de6362a9..98da35763b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -22,6 +22,7 @@ import java.io.{ObjectOutputStream, IOException} import scala.collection.mutable import scala.Some import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag /** * Class that captures a coalesced RDD by essentially keeping track of parent partitions @@ -68,7 +69,7 @@ case class CoalescedRDDPartition( * @param maxPartitions number of desired partitions in the coalesced RDD * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance */ -class CoalescedRDD[T: ClassManifest]( +class CoalescedRDD[T: ClassTag]( @transient var prev: RDD[T], maxPartitions: Int, balanceSlack: Double = 0.10) diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 02d75eccc5..688c310ee9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -90,12 +90,13 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = { // Compute the minimum and the maxium val (max: Double, min: Double) = self.mapPartitions { items => - Iterator(items.foldRight(-1/0.0, Double.NaN)((e: Double, x: Pair[Double, Double]) => + Iterator(items.foldRight(Double.NegativeInfinity, + Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) => (x._1.max(e), x._2.min(e)))) }.reduce { (maxmin1, maxmin2) => (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2)) } - if (max.isNaN() || max.isInfinity || min.isInfinity ) { + if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity ) { throw new UnsupportedOperationException( "Histogram on either an empty RDD or RDD containing +/-infinity or NaN") } diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala index c8900d1a93..a84e5f9fd8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala @@ -17,13 +17,14 @@ package org.apache.spark.rdd -import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext} +import scala.reflect.ClassTag +import org.apache.spark.{Partition, SparkContext, TaskContext} /** * An RDD that is empty, i.e. has no element in it. */ -class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) { +class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) { override def getPartitions: Array[Partition] = Array.empty diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala index 5312dc0b59..e74c83b90b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala @@ -18,8 +18,9 @@ package org.apache.spark.rdd import org.apache.spark.{OneToOneDependency, Partition, TaskContext} +import scala.reflect.ClassTag -private[spark] class FilteredRDD[T: ClassManifest]( +private[spark] class FilteredRDD[T: ClassTag]( prev: RDD[T], f: T => Boolean) extends RDD[T](prev) { diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala index cbdf6d84c0..4d1878fc14 100644 --- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala @@ -18,10 +18,11 @@ package org.apache.spark.rdd import org.apache.spark.{Partition, TaskContext} +import scala.reflect.ClassTag private[spark] -class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( +class FlatMappedRDD[U: ClassTag, T: ClassTag]( prev: RDD[T], f: T => TraversableOnce[U]) extends RDD[U](prev) { diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala index 829545d7b0..1a694475f6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala @@ -18,8 +18,9 @@ package org.apache.spark.rdd import org.apache.spark.{Partition, TaskContext} +import scala.reflect.ClassTag -private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T]) +private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T]) extends RDD[Array[T]](prev) { override def getPartitions: Array[Partition] = firstParent[T].partitions diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index aca0146884..8df8718f3b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -19,6 +19,8 @@ package org.apache.spark.rdd import java.sql.{Connection, ResultSet} +import scala.reflect.ClassTag + import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.util.NextIterator @@ -45,7 +47,7 @@ private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) e * This should only call getInt, getString, etc; the RDD takes care of calling next. * The default maps a ResultSet to an array of Object. */ -class JdbcRDD[T: ClassManifest]( +class JdbcRDD[T: ClassTag]( sc: SparkContext, getConnection: () => Connection, sql: String, diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index ae70d55951..db15baf503 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -18,9 +18,9 @@ package org.apache.spark.rdd import org.apache.spark.{Partition, TaskContext} +import scala.reflect.ClassTag - -private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( +private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala index e8be1c4816..8d7c288593 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala @@ -17,10 +17,12 @@ package org.apache.spark.rdd +import scala.reflect.ClassTag + import org.apache.spark.{Partition, TaskContext} private[spark] -class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U) +class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U) extends RDD[U](prev) { override def getPartitions: Array[Partition] = firstParent[T].partitions diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index 697be8b997..d5691f2267 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -17,7 +17,9 @@ package org.apache.spark.rdd -import org.apache.spark.{RangePartitioner, Logging} +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, RangePartitioner} /** * Extra functions available on RDDs of (key, value) pairs where the key is sortable through @@ -25,9 +27,9 @@ import org.apache.spark.{RangePartitioner, Logging} * use these functions. They will work with any key type that has a `scala.math.Ordered` * implementation. */ -class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, - V: ClassManifest, - P <: Product2[K, V] : ClassManifest]( +class OrderedRDDFunctions[K <% Ordered[K]: ClassTag, + V: ClassTag, + P <: Product2[K, V] : ClassTag]( self: RDD[P]) extends Logging with Serializable { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 93b78e1232..48168e152e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -25,6 +25,7 @@ import java.util.{HashMap => JHashMap} import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ +import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.mapred._ import org.apache.hadoop.io.compress.CompressionCodec @@ -50,7 +51,7 @@ import org.apache.spark.Partitioner.defaultPartitioner * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. */ -class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) +class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Logging with SparkHadoopMapReduceUtil with Serializable { @@ -415,7 +416,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) - val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag) prfs.mapValues { case Seq(vs, ws) => (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) } @@ -431,7 +432,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) - val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag) prfs.mapValues { case Seq(vs, w1s, w2s) => (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) } @@ -488,15 +489,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] = + def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size))) /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = + def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = subtractByKey(other, new HashPartitioner(numPartitions)) /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = + def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = new SubtractedRDD[K, V, W](self, other, p) /** @@ -525,8 +526,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ - def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { + saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -535,16 +536,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * supplied codec. */ def saveAsHadoopFile[F <: OutputFormat[K, V]]( - path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec) + path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { + saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec) } /** * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ - def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { - saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { + saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -698,11 +699,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) */ def values: RDD[V] = self.map(_._2) - private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure + private[spark] def getKeyClass() = implicitly[ClassTag[K]].runtimeClass - private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure + private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass } -private[spark] object Manifests { - val seqSeqManifest = classManifest[Seq[Seq[_]]] +private[spark] object ClassTags { + val seqSeqClassTag = classTag[Seq[Seq[_]]] } diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index cd96250389..09d0a8189d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -20,13 +20,15 @@ package org.apache.spark.rdd import scala.collection.immutable.NumericRange import scala.collection.mutable.ArrayBuffer import scala.collection.Map +import scala.reflect.ClassTag + import org.apache.spark._ import java.io._ import scala.Serializable import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.Utils -private[spark] class ParallelCollectionPartition[T: ClassManifest]( +private[spark] class ParallelCollectionPartition[T: ClassTag]( var rddId: Long, var slice: Int, var values: Seq[T]) @@ -78,7 +80,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest]( } } -private[spark] class ParallelCollectionRDD[T: ClassManifest]( +private[spark] class ParallelCollectionRDD[T: ClassTag]( @transient sc: SparkContext, @transient data: Seq[T], numSlices: Int, @@ -109,7 +111,7 @@ private object ParallelCollectionRDD { * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes * it efficient to run Spark over RDDs representing large sets of numbers. */ - def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { + def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { if (numSlices < 1) { throw new IllegalArgumentException("Positive number of slices required") } diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala index 574dd4233f..ea8885b36e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.rdd +import scala.reflect.ClassTag + import org.apache.spark.{NarrowDependency, SparkEnv, Partition, TaskContext} @@ -49,7 +51,7 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo * and the execution DAG has a filter on the key, we can avoid launching tasks * on partitions that don't have the range covering the key. */ -class PartitionPruningRDD[T: ClassManifest]( +class PartitionPruningRDD[T: ClassTag]( @transient prev: RDD[T], @transient partitionFilterFunc: Int => Boolean) extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) { @@ -69,6 +71,6 @@ object PartitionPruningRDD { * when its type T is not known at compile time. */ def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) = { - new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassManifest) + new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassTag) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index d5304ab0ae..1dbbe39898 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -24,6 +24,7 @@ import scala.collection.Map import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.io.Source +import scala.reflect.ClassTag import org.apache.spark.{SparkEnv, Partition, TaskContext} import org.apache.spark.broadcast.Broadcast @@ -33,7 +34,7 @@ import org.apache.spark.broadcast.Broadcast * An RDD that pipes the contents of each parent partition through an external command * (printing them one per line) and returns the output as a collection of strings. */ -class PipedRDD[T: ClassManifest]( +class PipedRDD[T: ClassTag]( prev: RDD[T], command: Seq[String], envVars: Map[String, String], diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 893708f8f2..ea45566ad1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -23,6 +23,9 @@ import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap +import scala.reflect.{classTag, ClassTag} + import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable @@ -69,7 +72,7 @@ import org.apache.spark._ * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details * on RDD internals. */ -abstract class RDD[T: ClassManifest]( +abstract class RDD[T: ClassTag]( @transient private var sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { @@ -243,13 +246,13 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD by applying a function to all elements of this RDD. */ - def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) + def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] = + def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) /** @@ -374,25 +377,25 @@ abstract class RDD[T: ClassManifest]( * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */ - def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) + def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) /** * Return an RDD of grouped items. */ - def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = + def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, defaultPartitioner(this)) /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] = + def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] = groupBy(f, new HashPartitioner(numPartitions)) /** * Return an RDD of grouped items. */ - def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { + def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) } @@ -439,7 +442,7 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[U: ClassManifest]( + def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) @@ -449,7 +452,7 @@ abstract class RDD[T: ClassManifest]( * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[U: ClassManifest]( + def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) @@ -459,7 +462,7 @@ abstract class RDD[T: ClassManifest]( * Return a new RDD by applying a function to each partition of this RDD. This is a variant of * mapPartitions that also passes the TaskContext into the closure. */ - def mapPartitionsWithContext[U: ClassManifest]( + def mapPartitionsWithContext[U: ClassTag]( f: (TaskContext, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter) @@ -471,7 +474,7 @@ abstract class RDD[T: ClassManifest]( * of the original partition. */ @deprecated("use mapPartitionsWithIndex", "0.7.0") - def mapPartitionsWithSplit[U: ClassManifest]( + def mapPartitionsWithSplit[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { mapPartitionsWithIndex(f, preservesPartitioning) } @@ -481,7 +484,7 @@ abstract class RDD[T: ClassManifest]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def mapWith[A: ClassManifest, U: ClassManifest] + def mapWith[A: ClassTag, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => U): RDD[U] = { mapPartitionsWithIndex((index, iter) => { @@ -495,7 +498,7 @@ abstract class RDD[T: ClassManifest]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def flatMapWith[A: ClassManifest, U: ClassManifest] + def flatMapWith[A: ClassTag, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => Seq[U]): RDD[U] = { mapPartitionsWithIndex((index, iter) => { @@ -509,7 +512,7 @@ abstract class RDD[T: ClassManifest]( * This additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def foreachWith[A: ClassManifest](constructA: Int => A)(f: (T, A) => Unit) { + def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) { mapPartitionsWithIndex { (index, iter) => val a = constructA(index) iter.map(t => {f(t, a); t}) @@ -521,7 +524,7 @@ abstract class RDD[T: ClassManifest]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def filterWith[A: ClassManifest](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { + def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.filter(t => p(t, a)) @@ -534,7 +537,7 @@ abstract class RDD[T: ClassManifest]( * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). */ - def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) + def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) /** * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by @@ -542,32 +545,27 @@ abstract class RDD[T: ClassManifest]( * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ - def zipPartitions[B: ClassManifest, V: ClassManifest] - (rdd2: RDD[B], preservesPartitioning: Boolean) - (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) - - def zipPartitions[B: ClassManifest, V: ClassManifest] + def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false) - def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest] + def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning) - def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest] + def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false) - def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest] + def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning) - def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest] + def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false) @@ -605,7 +603,7 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD that contains all matching values by applying `f`. */ - def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = { + def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = { filter(f.isDefinedAt).map(f) } @@ -695,7 +693,7 @@ abstract class RDD[T: ClassManifest]( * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. */ - def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { + def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) @@ -744,7 +742,7 @@ abstract class RDD[T: ClassManifest]( * combine step happens locally on the master, equivalent to running a single reduce task. */ def countByValue(): Map[T, Long] = { - if (elementClassManifest.erasure.isArray) { + if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValue() does not support arrays") } // TODO: This should perhaps be distributed by default. @@ -775,7 +773,7 @@ abstract class RDD[T: ClassManifest]( timeout: Long, confidence: Double = 0.95 ): PartialResult[Map[T, BoundedDouble]] = { - if (elementClassManifest.erasure.isArray) { + if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) => @@ -942,12 +940,12 @@ abstract class RDD[T: ClassManifest]( /** Record user function generating this RDD. */ @transient private[spark] val origin = Utils.formatSparkCallSite - private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T] + private[spark] def elementClassTag: ClassTag[T] = classTag[T] private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None /** Returns the first parent RDD */ - protected[spark] def firstParent[U: ClassManifest] = { + protected[spark] def firstParent[U: ClassTag] = { dependencies.head.rdd.asInstanceOf[RDD[U]] } @@ -1009,7 +1007,7 @@ abstract class RDD[T: ClassManifest]( origin) def toJavaRDD() : JavaRDD[T] = { - new JavaRDD(this)(elementClassManifest) + new JavaRDD(this)(elementClassTag) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index 6009a41570..3b56e45aa9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -17,6 +17,8 @@ package org.apache.spark.rdd +import scala.reflect.ClassTag + import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration @@ -38,7 +40,7 @@ private[spark] object CheckpointState extends Enumeration { * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations * of the checkpointed RDD. */ -private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) +private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T]) extends Logging with Serializable { import CheckpointState._ diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala index 2c5253ae30..d433670cc2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import scala.reflect.ClassTag import java.util.Random import cern.jet.random.Poisson @@ -29,9 +30,9 @@ class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition override val index: Int = prev.index } -class SampledRDD[T: ClassManifest]( +class SampledRDD[T: ClassTag]( prev: RDD[T], - withReplacement: Boolean, + withReplacement: Boolean, frac: Double, seed: Int) extends RDD[T](prev) { diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 5fe4676029..2d1bd5b481 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.rdd +import scala.reflect.{ ClassTag, classTag} + import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.hadoop.io.compress.CompressionCodec @@ -32,15 +33,15 @@ import org.apache.spark.Logging * * Import `org.apache.spark.SparkContext._` at the top of their program to use these functions. */ -class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest]( +class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag]( self: RDD[(K, V)]) extends Logging with Serializable { - private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { + private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = { val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { - classManifest[T].erasure + if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) { + classTag[T].runtimeClass } else { // We get the type of the Writable class by looking at the apply method which converts // from T to Writable. Since we have two apply methods we filter out the one which diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index a5d751a7bd..3682c84598 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -17,8 +17,10 @@ package org.apache.spark.rdd -import org.apache.spark.{Dependency, Partitioner, SparkEnv, ShuffleDependency, Partition, TaskContext} +import scala.reflect.ClassTag +import org.apache.spark.{Dependency, Partition, Partitioner, ShuffleDependency, + SparkEnv, TaskContext} private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { override val index = idx @@ -32,7 +34,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { * @tparam K the key class. * @tparam V the value class. */ -class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest]( +class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag]( @transient var prev: RDD[P], part: Partitioner) extends RDD[P](prev.context, Nil) { diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index 7af4d803e7..aab30b1bb4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -18,8 +18,11 @@ package org.apache.spark.rdd import java.util.{HashMap => JHashMap} + import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + import org.apache.spark.Partitioner import org.apache.spark.Dependency import org.apache.spark.TaskContext @@ -45,7 +48,7 @@ import org.apache.spark.OneToOneDependency * you can use `rdd1`'s partitioner/partition size and not worry about running * out of memory because of the size of `rdd2`. */ -private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest]( +private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( @transient var rdd1: RDD[_ <: Product2[K, V]], @transient var rdd2: RDD[_ <: Product2[K, W]], part: Partitioner) diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index ae8a9f36a6..08a41ac558 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -18,10 +18,13 @@ package org.apache.spark.rdd import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + import org.apache.spark.{Dependency, RangeDependency, SparkContext, Partition, TaskContext} + import java.io.{ObjectOutputStream, IOException} -private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int) +private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitIndex: Int) extends Partition { var split: Partition = rdd.partitions(splitIndex) @@ -40,7 +43,7 @@ private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], spl } } -class UnionRDD[T: ClassManifest]( +class UnionRDD[T: ClassTag]( sc: SparkContext, @transient var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index a97d2a01c8..83be3c6eb4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import org.apache.spark.{OneToOneDependency, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} +import scala.reflect.ClassTag private[spark] class ZippedPartitionsPartition( idx: Int, @@ -38,7 +39,7 @@ private[spark] class ZippedPartitionsPartition( } } -abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( +abstract class ZippedPartitionsBaseRDD[V: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[_]], preservesPartitioning: Boolean = false) @@ -71,7 +72,7 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( } } -class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( +class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( sc: SparkContext, f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], @@ -92,7 +93,7 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest] } class ZippedPartitionsRDD3 - [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( + [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], var rdd1: RDD[A], @@ -117,7 +118,7 @@ class ZippedPartitionsRDD3 } class ZippedPartitionsRDD4 - [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( + [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], var rdd1: RDD[A], diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala index 567b67dfee..fb5b070c18 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala @@ -18,10 +18,12 @@ package org.apache.spark.rdd import org.apache.spark.{OneToOneDependency, SparkContext, Partition, TaskContext} + import java.io.{ObjectOutputStream, IOException} +import scala.reflect.ClassTag -private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest]( +private[spark] class ZippedPartition[T: ClassTag, U: ClassTag]( idx: Int, @transient rdd1: RDD[T], @transient rdd2: RDD[U] @@ -42,7 +44,7 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest]( } } -class ZippedRDD[T: ClassManifest, U: ClassManifest]( +class ZippedRDD[T: ClassTag, U: ClassTag]( sc: SparkContext, var rdd1: RDD[T], var rdd2: RDD[U]) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f9cd021dd3..963d15b76d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -21,9 +21,11 @@ import java.io.NotSerializableException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger -import akka.actor._ -import akka.util.duration._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.concurrent.duration._ +import scala.reflect.ClassTag + +import akka.actor._ import org.apache.spark._ import org.apache.spark.rdd.RDD @@ -104,7 +106,7 @@ class DAGScheduler( // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one // as more failure events come in - val RESUBMIT_TIMEOUT = 50L + val RESUBMIT_TIMEOUT = 50.milliseconds // The time, in millis, to wake up between polls of the completion queue in order to potentially // resubmit failed stages @@ -177,13 +179,14 @@ class DAGScheduler( var resubmissionTask: Cancellable = _ override def preStart() { + import context.dispatcher /** * A message is sent to the actor itself periodically to remind the actor to resubmit failed * stages. In this way, stage resubmission can be done within the same thread context of * other event processing logic to avoid unnecessary synchronization overhead. */ resubmissionTask = context.system.scheduler.schedule( - RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages) + RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages) } /** @@ -460,7 +463,7 @@ class DAGScheduler( waiter } - def runJob[T, U: ClassManifest]( + def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala index 0a786deb16..3832ee7ff6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala @@ -22,7 +22,7 @@ package org.apache.spark.scheduler * to order tasks amongst a Schedulable's sub-queues * "NONE" is used when the a Schedulable has no sub-queues. */ -object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") { +object SchedulingMode extends Enumeration { type SchedulingMode = Value val FAIR,FIFO,NONE = Value diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala index 47b0f387aa..35de13c385 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala @@ -18,9 +18,7 @@ package org.apache.spark.scheduler -private[spark] object TaskLocality - extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") -{ +private[spark] object TaskLocality extends Enumeration { // process local is expected to be used ONLY within tasksetmanager for now. val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 4d82430b97..66ab8ea4cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -24,8 +24,7 @@ import java.util.{TimerTask, Timer} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet - -import akka.util.duration._ +import scala.concurrent.duration._ import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -122,7 +121,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (System.getProperty("spark.speculation", "false").toBoolean) { logInfo("Starting speculative execution thread") - + import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { checkSpeculatableTasks() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 94961790df..bf494aa64d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -529,10 +529,10 @@ private[spark] class ClusterTaskSetManager( addPendingTask(index) if (state != TaskState.KILLED) { numFailures(index) += 1 - if (numFailures(index) > MAX_TASK_FAILURES) { - logError("Task %s:%d failed more than %d times; aborting job".format( + if (numFailures(index) >= MAX_TASK_FAILURES) { + logError("Task %s:%d failed %d times; aborting job".format( taskSet.id, index, MAX_TASK_FAILURES)) - abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES)) + abort("Task %s:%d failed %d times".format(taskSet.id, index, MAX_TASK_FAILURES)) } } } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d0ba5bf55d..f5e8766f6d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -20,13 +20,12 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.Await +import scala.concurrent.duration._ import akka.actor._ -import akka.dispatch.Await import akka.pattern.ask -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} -import akka.util.Duration -import akka.util.duration._ +import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.{SparkException, Logging, TaskState} import org.apache.spark.scheduler.TaskDescription @@ -53,15 +52,15 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac private val executorAddress = new HashMap[String, Address] private val executorHost = new HashMap[String, String] private val freeCores = new HashMap[String, Int] - private val actorToExecutorId = new HashMap[ActorRef, String] private val addressToExecutorId = new HashMap[Address, String] override def preStart() { // Listen for remote client disconnection events, since they don't go through Akka's watch() - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) // Periodically revive offers to allow delay scheduling to work val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong + import context.dispatcher context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers) } @@ -73,12 +72,10 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) - context.watch(sender) executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 freeCores(executorId) = cores executorAddress(executorId) = sender.path.address - actorToExecutorId(sender) = executorId addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() @@ -118,14 +115,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac removeExecutor(executorId, reason) sender ! true - case Terminated(actor) => - actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) + case DisassociatedEvent(_, address, _) => + addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) - case RemoteClientDisconnected(transport, address) => - addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected")) - - case RemoteClientShutdown(transport, address) => - addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown")) } // Make fake resource offers on all executors @@ -153,7 +145,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") val numCores = freeCores(executorId) - actorToExecutorId -= executorActor(executorId) addressToExecutorId -= executorAddress(executorId) executorActor -= executorId executorHost -= executorId diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index e000531a26..e8fecec4a6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -36,7 +36,7 @@ private[spark] class SimrSchedulerBackend( override def start() { super.start() - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index cefa970bb9..7127a72d6d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -42,7 +42,7 @@ private[spark] class SparkDeploySchedulerBackend( super.start() // The endpoint for executors to talk to us - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala index 2064d97b49..e68c527713 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -71,7 +71,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader taskSetManager.abort("ClassNotFound with classloader: " + loader) - case ex => + case ex: Throwable => taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) } } @@ -95,7 +95,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche val loader = Thread.currentThread.getContextClassLoader logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) - case ex => {} + case ex: Throwable => {} } scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index cd521e0f2b..84fe3094cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -120,7 +120,7 @@ private[spark] class CoarseMesosSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 702aca8323..19a025a329 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -24,9 +24,9 @@ import scala.collection.mutable.{HashMap, ArrayBuffer} import scala.util.Random import akka.actor.{ActorSystem, Cancellable, Props} -import akka.dispatch.{Await, Future} -import akka.util.Duration -import akka.util.duration._ +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} @@ -924,4 +924,3 @@ private[spark] object BlockManager extends Logging { blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host)) } } - diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 94038649b3..e05b842476 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -17,16 +17,17 @@ package org.apache.spark.storage -import akka.actor.ActorRef -import akka.dispatch.{Await, Future} +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +import akka.actor._ import akka.pattern.ask -import akka.util.Duration import org.apache.spark.{Logging, SparkException} import org.apache.spark.storage.BlockManagerMessages._ - -private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging { +private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt @@ -156,7 +157,10 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi while (attempts < AKKA_RETRY_ATTEMPTS) { attempts += 1 try { - val future = driverActor.ask(message)(timeout) + val future = driverActor match { + case Left(a: ActorRef) => a.ask(message)(timeout) + case Right(b: ActorSelection) => b.ask(message)(timeout) + } val result = Await.result(future, timeout) if (result == null) { throw new SparkException("BlockManagerMaster returned null") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index f8cf14b503..154a3980e9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -23,10 +23,10 @@ import scala.collection.mutable import scala.collection.JavaConversions._ import akka.actor.{Actor, ActorRef, Cancellable} -import akka.dispatch.Future import akka.pattern.ask -import akka.util.Duration -import akka.util.duration._ + +import scala.concurrent.duration._ +import scala.concurrent.Future import org.apache.spark.{Logging, SparkException} import org.apache.spark.storage.BlockManagerMessages._ @@ -65,6 +65,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { override def preStart() { if (!BlockManager.getDisableHeartBeatsForTesting) { + import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule( 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) } diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index 860e680576..a8db37ded1 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -93,7 +93,7 @@ private[spark] object ThreadingTest { val actorSystem = ActorSystem("test") val serializer = new KryoSerializer val blockManagerMaster = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))) + Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))) val blockManager = new BlockManager( "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index e7eab374ad..c1ee2f3d00 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import akka.util.Duration +import scala.concurrent.duration._ import java.text.SimpleDateFormat diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index 1d633d374a..a5446b3fc3 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.storage -import akka.util.Duration +import scala.concurrent.duration._ import javax.servlet.http.HttpServletRequest diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index d4c5065c3f..74133cef6c 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,11 +17,8 @@ package org.apache.spark.util -import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory -import akka.util.duration._ -import akka.remote.RemoteActorRefProvider - /** * Various utility classes for working with Akka. @@ -34,39 +31,57 @@ private[spark] object AkkaUtils { * * Note: the `name` parameter is important, as even if a client sends a message to right * host + port, if the system name is incorrect, Akka will drop the message. + * + * If indestructible is set to true, the Actor System will continue running in the event + * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ - def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { - val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt + def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false) + : (ActorSystem, Int) = { + + val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt - val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt + + val akkaTimeout = System.getProperty("spark.akka.timeout", "100").toInt + val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt - val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" - // 10 seconds is the default akka timeout, but in a cluster, we need higher by default. - val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt - - val akkaConf = ConfigFactory.parseString(""" - akka.daemonic = on - akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] - akka.stdout-loglevel = "ERROR" - akka.actor.provider = "akka.remote.RemoteActorRefProvider" - akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" - akka.remote.netty.hostname = "%s" - akka.remote.netty.port = %d - akka.remote.netty.connection-timeout = %ds - akka.remote.netty.message-frame-size = %d MiB - akka.remote.netty.execution-pool-size = %d - akka.actor.default-dispatcher.throughput = %d - akka.remote.log-remote-lifecycle-events = %s - akka.remote.netty.write-timeout = %ds - """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize, - lifecycleEvents, akkaWriteTimeout)) + val lifecycleEvents = + if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" + + val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "600").toInt + val akkaFailureDetector = + System.getProperty("spark.akka.failure-detector.threshold", "300.0").toDouble + val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "1000").toInt - val actorSystem = ActorSystem(name, akkaConf) + val akkaConf = ConfigFactory.parseString( + s""" + |akka.daemonic = on + |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] + |akka.stdout-loglevel = "ERROR" + |akka.jvm-exit-on-fatal-error = off + |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s + |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s + |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector + |akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |akka.remote.netty.tcp.hostname = "$host" + |akka.remote.netty.tcp.port = $port + |akka.remote.netty.tcp.tcp-nodelay = on + |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s + |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB + |akka.remote.netty.tcp.execution-pool-size = $akkaThreads + |akka.actor.default-dispatcher.throughput = $akkaBatchSize + |akka.remote.log-remote-lifecycle-events = $lifecycleEvents + """.stripMargin) + + val actorSystem = if (indestructible) { + IndestructibleActorSystem(name, akkaConf) + } else { + ActorSystem(name, akkaConf) + } - // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a - // hack because Akka doesn't let you figure out the port through the public API yet. val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider - val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get - return (actorSystem, boundPort) + val boundPort = provider.getDefaultAddress.port.get + (actorSystem, boundPort) } + } diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala index 0b51c23f7b..a38329df03 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala @@ -34,6 +34,8 @@ class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) override def iterator: Iterator[A] = underlying.iterator.asScala + override def size: Int = underlying.size + override def ++=(xs: TraversableOnce[A]): this.type = { xs.foreach { this += _ } this diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala new file mode 100644 index 0000000000..bf71882ef7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Must be in akka.actor package as ActorSystemImpl is protected[akka]. +package akka.actor + +import scala.util.control.{ControlThrowable, NonFatal} + +import com.typesafe.config.Config + +/** + * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception. + * This is necessary as Spark Executors are allowed to recover from fatal exceptions + * (see [[org.apache.spark.executor.Executor]]). + */ +object IndestructibleActorSystem { + def apply(name: String, config: Config): ActorSystem = + apply(name, config, ActorSystem.findClassLoader()) + + def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = + new IndestructibleActorSystemImpl(name, config, classLoader).start() +} + +private[akka] class IndestructibleActorSystemImpl( + override val name: String, + applicationConfig: Config, + classLoader: ClassLoader) + extends ActorSystemImpl(name, applicationConfig, classLoader) { + + protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { + val fallbackHandler = super.uncaughtExceptionHandler + + new Thread.UncaughtExceptionHandler() { + def uncaughtException(thread: Thread, cause: Throwable): Unit = { + if (isFatalError(cause) && !settings.JvmExitOnFatalError) { + log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + + "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name) + //shutdown() //TODO make it configurable + } else { + fallbackHandler.uncaughtException(thread, cause) + } + } + } + } + + def isFatalError(e: Throwable): Boolean = { + e match { + case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => + false + case _ => + true + } + } +} diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 67a7f87a5c..7b41ef89f1 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -55,8 +55,7 @@ class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, clea } } -object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask", - "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") { +object MetadataCleanerType extends Enumeration { val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 277de2f8a6..dbff571de9 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -85,7 +85,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging { } override def filter(p: ((A, B)) => Boolean): Map[A, B] = { - JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p) + JavaConversions.mapAsScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p) } override def empty: Map[A, B] = new TimeStampedHashMap[A, B]() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a79e64e810..3f7858d2de 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -22,10 +22,11 @@ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} +import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConversions._ import scala.io.Source +import scala.reflect.ClassTag import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -319,7 +320,7 @@ private[spark] object Utils extends Logging { * result in a new collection. Unlike scala.util.Random.shuffle, this method * uses a local random number generator, avoiding inter-thread contention. */ - def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = { + def randomize[T: ClassTag](seq: TraversableOnce[T]): Seq[T] = { randomizeInPlace(seq.toArray) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index 80545c9688..c26f23d500 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -17,6 +17,7 @@ package org.apache.spark.util.collection +import scala.reflect.ClassTag /** * A fast hash map implementation for nullable keys. This hash map supports insertions and updates, @@ -26,7 +27,7 @@ package org.apache.spark.util.collection * Under the hood, it uses our OpenHashSet implementation. */ private[spark] -class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest]( +class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag]( initialCapacity: Int) extends Iterable[(K, V)] with Serializable { diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 40986e3731..87e009a4de 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -17,6 +17,7 @@ package org.apache.spark.util.collection +import scala.reflect._ /** * A simple, fast hash set optimized for non-null insertion-only use case, where keys are never @@ -36,7 +37,7 @@ package org.apache.spark.util.collection * to explore all spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing). */ private[spark] -class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( +class OpenHashSet[@specialized(Long, Int) T: ClassTag]( initialCapacity: Int, loadFactor: Double) extends Serializable { @@ -62,14 +63,14 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( // throws: // scala.tools.nsc.symtab.Types$TypeError: type mismatch; // found : scala.reflect.AnyValManifest[Long] - // required: scala.reflect.ClassManifest[Int] + // required: scala.reflect.ClassTag[Int] // at scala.tools.nsc.typechecker.Contexts$Context.error(Contexts.scala:298) // at scala.tools.nsc.typechecker.Infer$Inferencer.error(Infer.scala:207) // ... - val mt = classManifest[T] - if (mt == ClassManifest.Long) { + val mt = classTag[T] + if (mt == ClassTag.Long) { (new LongHasher).asInstanceOf[Hasher[T]] - } else if (mt == ClassManifest.Int) { + } else if (mt == ClassTag.Int) { (new IntHasher).asInstanceOf[Hasher[T]] } else { new Hasher[T] diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index d76143e45a..2e1ef06cbc 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -17,6 +17,7 @@ package org.apache.spark.util.collection +import scala.reflect._ /** * A fast hash map implementation for primitive, non-null keys. This hash map supports @@ -26,15 +27,15 @@ package org.apache.spark.util.collection * Under the hood, it uses our OpenHashSet implementation. */ private[spark] -class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, - @specialized(Long, Int, Double) V: ClassManifest]( +class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag, + @specialized(Long, Int, Double) V: ClassTag]( initialCapacity: Int) extends Iterable[(K, V)] with Serializable { def this() = this(64) - require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int]) + require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int]) // Init in constructor (instead of in declaration) to work around a Scala compiler specialization // bug that would generate two arrays (one for Object and one for specialized T). diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala index 20554f0aab..b84eb65c62 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -17,11 +17,13 @@ package org.apache.spark.util.collection +import scala.reflect.ClassTag + /** * An append-only, non-threadsafe, array-backed vector that is optimized for primitive types. */ private[spark] -class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) { +class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) { private var _numElements = 0 private var _array: Array[V] = _ diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 4434f3b87c..c443c5266e 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -27,6 +27,21 @@ import org.apache.spark.SparkContext._ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext { + + implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] { + def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = { + t1 ++= t2 + t1 + } + def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = { + t1 += t2 + t1 + } + def zero(t: mutable.Set[A]) : mutable.Set[A] = { + new mutable.HashSet[A]() + } + } + test ("basic accumulation"){ sc = new SparkContext("local", "test") val acc : Accumulator[Int] = sc.accumulator(0) @@ -51,7 +66,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte } test ("add value to collection accumulators") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -68,22 +82,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte } } - implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] { - def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = { - t1 ++= t2 - t1 - } - def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = { - t1 += t2 - t1 - } - def zero(t: mutable.Set[Any]) : mutable.Set[Any] = { - new mutable.HashSet[Any]() - } - } - test ("value not readable in tasks") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -125,7 +124,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte } test ("localValue readable in tasks") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index d2226aa5a5..f25d921d3f 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import scala.reflect.ClassTag import org.scalatest.FunSuite import java.io.File import org.apache.spark.rdd._ @@ -205,7 +206,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { * not, but this is not done by default as usually the partitions do not refer to any RDD and * therefore never store the lineage. */ - def testCheckpointing[U: ClassManifest]( + def testCheckpointing[U: ClassTag]( op: (RDD[Int]) => RDD[U], testRDDSize: Boolean = true, testRDDPartitionSize: Boolean = false @@ -274,7 +275,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed, * this RDD will remember the partitions and therefore potentially the whole lineage. */ - def testParentCheckpointing[U: ClassManifest]( + def testParentCheckpointing[U: ClassTag]( op: (RDD[Int]) => RDD[U], testRDDSize: Boolean, testRDDPartitionSize: Boolean diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 480bac84f3..d9cb7fead5 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -122,7 +122,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.parallelize(1 to 10, 10).foreach(x => println(x / 0)) } assert(thrown.getClass === classOf[SparkException]) - assert(thrown.getMessage.contains("more than 4 times")) + assert(thrown.getMessage.contains("failed 4 times")) } test("caching") { @@ -303,12 +303,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter Thread.sleep(200) } } catch { - case _ => { Thread.sleep(10) } + case _: Throwable => { Thread.sleep(10) } // Do nothing. We might see exceptions because block manager // is racing this thread to remove entries from the driver. } } } + } object DistributedSuite { diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index 01a72d8401..6d1695eae7 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -34,7 +34,7 @@ class DriverSuite extends FunSuite with Timeouts { // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => - failAfter(30 seconds) { + failAfter(60 seconds) { Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master), new File(System.getenv("SPARK_HOME"))) } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index b7eb268bd5..271dc905bc 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -49,14 +49,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master start and stop") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster() - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) tracker.stop() } test("master register and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster() - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -75,7 +75,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master register and unregister and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster() - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -101,13 +101,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { System.setProperty("spark.hostPort", hostname + ":" + boundPort) val masterTracker = new MapOutputTrackerMaster() - masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + masterTracker.trackerActor = Left(actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")) val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0) val slaveTracker = new MapOutputTracker() - slaveTracker.trackerActor = slaveSystem.actorFor( - "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker") + slaveTracker.trackerActor = Right(slaveSystem.actorSelection( + "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker")) masterTracker.registerShuffle(10, 1) masterTracker.incrementEpoch() diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala index 46a2da1724..768ca3850e 100644 --- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala +++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala @@ -37,7 +37,7 @@ class UnpersistSuite extends FunSuite with LocalSparkContext { Thread.sleep(200) } } catch { - case _ => { Thread.sleep(10) } + case _: Throwable => { Thread.sleep(10) } // Do nothing. We might see exceptions because block manager // is racing this thread to remove entries from the driver. } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 354ab8ae5d..d8dcd6d14c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -244,8 +244,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { // test that you get over 90% locality in each group val minLocality = coalesced2.partitions .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction) - .foldLeft(1.)((perc, loc) => math.min(perc,loc)) - assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%") + .foldLeft(1.0)((perc, loc) => math.min(perc,loc)) + assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%") // test that the groups are load balanced with 100 +/- 20 elements in each val maxImbalance = coalesced2.partitions @@ -257,9 +257,9 @@ class RDDSuite extends FunSuite with SharedSparkContext { val coalesced3 = data3.coalesce(numMachines*2) val minLocality2 = coalesced3.partitions .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction) - .foldLeft(1.)((perc, loc) => math.min(perc,loc)) + .foldLeft(1.0)((perc, loc) => math.min(perc,loc)) assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " + - (minLocality2*100.).toInt + "%") + (minLocality2*100.0).toInt + "%") } test("zipped RDDs") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 1fd76420ea..2e41438a52 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -145,7 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc // Make a task whose result is larger than the akka frame size System.setProperty("spark.akka.frameSize", "1") val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x) assert(result === 1.to(akkaFrameSize).toArray) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index b97f2b19b5..29c4cc5d9c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -283,7 +283,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted // after the last failure. - (0 until manager.MAX_TASK_FAILURES).foreach { index => + (1 to manager.MAX_TASK_FAILURES).foreach { index => val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY) assert(offerResult != None, "Expect resource offer on iteration %s to return a task".format(index)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala index ee150a3107..27c2d53361 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala @@ -82,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA test("handling results larger than Akka frame size") { val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) assert(result === 1.to(akkaFrameSize).toArray) @@ -103,7 +103,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA } scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) assert(result === 1.to(akkaFrameSize).toArray) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index cb76275e39..b647e8a672 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -39,7 +39,7 @@ class BlockIdSuite extends FunSuite { fail() } catch { case e: IllegalStateException => // OK - case _ => fail() + case _: Throwable => fail() } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 484a654108..5b4d63b954 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -56,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT System.setProperty("spark.hostPort", "localhost:" + boundPort) master = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))) + Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case oldArch = System.setProperty("os.arch", "amd64") diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 8f0ec6683b..3764f4d1a0 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -34,7 +34,6 @@ class UISuite extends FunSuite { } val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq()) val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq()) - // Allow some wiggle room in case ports on the machine are under contention assert(boundPort1 > startPort && boundPort1 < startPort + 10) assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10) diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index 4e40dcbdee..5aff26f9fc 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -63,54 +63,53 @@ class SizeEstimatorSuite } test("simple classes") { - assert(SizeEstimator.estimate(new DummyClass1) === 16) - assert(SizeEstimator.estimate(new DummyClass2) === 16) - assert(SizeEstimator.estimate(new DummyClass3) === 24) - assert(SizeEstimator.estimate(new DummyClass4(null)) === 24) - assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48) + expectResult(16)(SizeEstimator.estimate(new DummyClass1)) + expectResult(16)(SizeEstimator.estimate(new DummyClass2)) + expectResult(24)(SizeEstimator.estimate(new DummyClass3)) + expectResult(24)(SizeEstimator.estimate(new DummyClass4(null))) + expectResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) } // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors // (Sun vs IBM). Use a DummyString class to make tests deterministic. test("strings") { - assert(SizeEstimator.estimate(DummyString("")) === 40) - assert(SizeEstimator.estimate(DummyString("a")) === 48) - assert(SizeEstimator.estimate(DummyString("ab")) === 48) - assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56) + expectResult(40)(SizeEstimator.estimate(DummyString(""))) + expectResult(48)(SizeEstimator.estimate(DummyString("a"))) + expectResult(48)(SizeEstimator.estimate(DummyString("ab"))) + expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) } test("primitive arrays") { - assert(SizeEstimator.estimate(new Array[Byte](10)) === 32) - assert(SizeEstimator.estimate(new Array[Char](10)) === 40) - assert(SizeEstimator.estimate(new Array[Short](10)) === 40) - assert(SizeEstimator.estimate(new Array[Int](10)) === 56) - assert(SizeEstimator.estimate(new Array[Long](10)) === 96) - assert(SizeEstimator.estimate(new Array[Float](10)) === 56) - assert(SizeEstimator.estimate(new Array[Double](10)) === 96) - assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016) - assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016) + expectResult(32)(SizeEstimator.estimate(new Array[Byte](10))) + expectResult(40)(SizeEstimator.estimate(new Array[Char](10))) + expectResult(40)(SizeEstimator.estimate(new Array[Short](10))) + expectResult(56)(SizeEstimator.estimate(new Array[Int](10))) + expectResult(96)(SizeEstimator.estimate(new Array[Long](10))) + expectResult(56)(SizeEstimator.estimate(new Array[Float](10))) + expectResult(96)(SizeEstimator.estimate(new Array[Double](10))) + expectResult(4016)(SizeEstimator.estimate(new Array[Int](1000))) + expectResult(8016)(SizeEstimator.estimate(new Array[Long](1000))) } test("object arrays") { // Arrays containing nulls should just have one pointer per element - assert(SizeEstimator.estimate(new Array[String](10)) === 56) - assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56) - + expectResult(56)(SizeEstimator.estimate(new Array[String](10))) + expectResult(56)(SizeEstimator.estimate(new Array[AnyRef](10))) // For object arrays with non-null elements, each object should take one pointer plus // however many bytes that class takes. (Note that Array.fill calls the code in its // second parameter separately for each object, so we get distinct objects.) - assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216) - assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216) - assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296) - assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56) + expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1))) + expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2))) + expectResult(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3))) + expectResult(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2))) // Past size 100, our samples 100 elements, but we should still get the right size. - assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016) + expectResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) // If an array contains the *same* element many times, we should only count it once. val d1 = new DummyClass1 - assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object - assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object + expectResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object + expectResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object // Same thing with huge array containing the same element many times. Note that this won't // return exactly 4032 because it can't tell that *all* the elements will equal the first @@ -128,11 +127,10 @@ class SizeEstimatorSuite val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - assert(SizeEstimator.estimate(DummyString("")) === 40) - assert(SizeEstimator.estimate(DummyString("a")) === 48) - assert(SizeEstimator.estimate(DummyString("ab")) === 48) - assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56) - + expectResult(40)(SizeEstimator.estimate(DummyString(""))) + expectResult(48)(SizeEstimator.estimate(DummyString("a"))) + expectResult(48)(SizeEstimator.estimate(DummyString("ab"))) + expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) resetOrClear("os.arch", arch) } @@ -145,10 +143,10 @@ class SizeEstimatorSuite val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - assert(SizeEstimator.estimate(DummyString("")) === 56) - assert(SizeEstimator.estimate(DummyString("a")) === 64) - assert(SizeEstimator.estimate(DummyString("ab")) === 64) - assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72) + expectResult(56)(SizeEstimator.estimate(DummyString(""))) + expectResult(64)(SizeEstimator.estimate(DummyString("a"))) + expectResult(64)(SizeEstimator.estimate(DummyString("ab"))) + expectResult(72)(SizeEstimator.estimate(DummyString("abcdefgh"))) resetOrClear("os.arch", arch) resetOrClear("spark.test.useCompressedOops", oops) |