aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-14 00:22:45 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-14 00:22:45 -0800
commit97ac06018206b593600594605be241d0cd706e08 (patch)
treea956d15687b75e4613bf6b2a6e3c20b335061cf7 /core
parentd2efe13574090e93c600adeacc7f6356bc196e6c (diff)
parent7ac944fc27805e2a76285ce3a31f3b2ecf4a7b78 (diff)
downloadspark-97ac06018206b593600594605be241d0cd706e08.tar.gz
spark-97ac06018206b593600594605be241d0cd706e08.tar.bz2
spark-97ac06018206b593600594605be241d0cd706e08.zip
Merge pull request #259 from pwendell/scala-2.10
Migration to Scala 2.10 == Below description was written by Prashant Sharma == This PR migrates spark to scala 2.10. Summary of changes apart from scala 2.10 migration: (has no implications for user.) 1. Migrated Akka to 2.2.3. Does not use remote death watch for it has a bug, where it tries to send message to dead node infinitely. Uses an indestructible actorsystem which tolerates errors only on executors. (Might be useful for user.) 4. New configuration settings introduced: System.getProperty("spark.akka.heartbeat.pauses", "600") System.getProperty("spark.akka.failure-detector.threshold", "300.0") System.getProperty("spark.akka.heartbeat.interval", "1000") Defaults for these are fairly large to only disable Failure detector that comes with akka. The reason for doing so is we have our own failure detector like mechanism in place and then this is just an overhead on top of that + it leads to a lot of false positives. But with these properties it is possible to enable them. A good use case for enabling it could be when someone wants spark to be sensitive (in a controllable manner ofc.) to GC pauses/Network lags and quickly evict executors that experienced it. More information is included in configuration.md Once we have the SPARK-544 merged, I had like to deprecate atleast these akka properties and may be others too. This PR is duplicate of #221(where all the discussion happened.) for that one pointed to master this one points to scala-2.10 branch.
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml29
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClient.java2
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServer.java1
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala77
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/TaskState.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala60
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java1
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function.java8
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function2.java8
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function3.java8
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java12
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java12
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala69
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala64
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala79
-rw-r--r--core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/UnpersistSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala72
106 files changed, 718 insertions, 615 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 38f4be1280..3fe48fd2af 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Core</name>
<url>http://spark.incubator.apache.org/</url>
@@ -86,7 +86,7 @@
</dependency>
<dependency>
<groupId>com.twitter</groupId>
- <artifactId>chill_2.9.3</artifactId>
+ <artifactId>chill_2.10</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
@@ -96,19 +96,15 @@
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
- <artifactId>akka-actor</artifactId>
+ <artifactId>akka-actor_2.10</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
- <artifactId>akka-remote</artifactId>
+ <artifactId>akka-remote_2.10</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
- <artifactId>akka-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scalap</artifactId>
+ <artifactId>akka-slf4j_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
@@ -116,7 +112,7 @@
</dependency>
<dependency>
<groupId>net.liftweb</groupId>
- <artifactId>lift-json_2.9.2</artifactId>
+ <artifactId>lift-json_2.10</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
@@ -164,13 +160,18 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_2.9.3</artifactId>
+ <artifactId>scalacheck_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -190,8 +191,8 @@
</dependency>
</dependencies>
<build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <outputDirectory>target/scala-2.10/classes</outputDirectory>
+ <testOutputDirectory>target/scala-2.10/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 20a7a3aa8c..edd0fc56f8 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -19,8 +19,6 @@ package org.apache.spark.network.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index 666432474d..a99af348ce 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -20,7 +20,6 @@ package org.apache.spark.network.netty;
import java.net.InetSocketAddress;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index b4d0b7017c..10fae5af9f 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -21,12 +21,11 @@ import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.HashSet
+import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
-import akka.dispatch._
import akka.pattern.ask
-import akka.util.Duration
-
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
@@ -55,9 +54,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
+
// Set to the MapOutputTrackerActor living on the driver
- var trackerActor: ActorRef = _
+ var trackerActor: Either[ActorRef, ActorSelection] = _
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
@@ -73,8 +72,18 @@ private[spark] class MapOutputTracker extends Logging {
// throw a SparkException if this fails.
private def askTracker(message: Any): Any = {
try {
- val future = trackerActor.ask(message)(timeout)
- return Await.result(future, timeout)
+ /*
+ The difference between ActorRef and ActorSelection is well explained here:
+ http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor
+ In spark a map output tracker can be either started on Driver where it is created which
+ is an ActorRef or it can be on executor from where it is looked up which is an
+ actorSelection.
+ */
+ val future = trackerActor match {
+ case Left(a: ActorRef) => a.ask(message)(timeout)
+ case Right(b: ActorSelection) => b.ask(message)(timeout)
+ }
+ Await.result(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error communicating with MapOutputTracker", e)
@@ -117,7 +126,7 @@ private[spark] class MapOutputTracker extends Logging {
fetching += shuffleId
}
}
-
+
if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
@@ -144,7 +153,7 @@ private[spark] class MapOutputTracker extends Logging {
else{
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
- }
+ }
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
@@ -312,7 +321,7 @@ private[spark] object MapOutputTracker {
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
assert (statuses != null)
statuses.map {
- status =>
+ status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 0e2c987a59..bcec41c439 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -17,8 +17,10 @@
package org.apache.spark
-import org.apache.spark.util.Utils
+import scala.reflect.ClassTag
+
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
@@ -72,7 +74,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
-
+
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
@@ -85,7 +87,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
* Determines the ranges by sampling the RDD passed in.
*/
-class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
+class RangePartitioner[K <% Ordered[K]: ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 66006bf212..a0f794edfd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -26,6 +26,7 @@ import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -267,19 +268,19 @@ class SparkContext(
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */
- def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
+ def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
/** Distribute a local Scala collection to form an RDD. */
- def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
+ def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}
/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
- def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
+ def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
}
@@ -332,7 +333,7 @@ class SparkContext(
}
/**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
+ * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
@@ -340,17 +341,17 @@ class SparkContext(
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
: RDD[(K, V)] = {
hadoopFile(path,
- fm.erasure.asInstanceOf[Class[F]],
- km.erasure.asInstanceOf[Class[K]],
- vm.erasure.asInstanceOf[Class[V]],
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
minSplits)
}
/**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
+ * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
@@ -358,17 +359,17 @@ class SparkContext(
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] =
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = {
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
path,
- fm.erasure.asInstanceOf[Class[F]],
- km.erasure.asInstanceOf[Class[K]],
- vm.erasure.asInstanceOf[Class[V]])
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]])
}
/**
@@ -426,11 +427,11 @@ class SparkContext(
* IntWritable). The most natural thing would've been to have implicit objects for the
* converters, but then we couldn't have an object for every subclass of Writable (you can't
* have a parameterized singleton object). We use functions instead to create a new converter
- * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
+ * for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
- (implicit km: ClassManifest[K], vm: ClassManifest[V],
+ (implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
@@ -449,7 +450,7 @@ class SparkContext(
* slow if you use the default serializer (Java serialization), though the nice thing about it is
* that there's very little effort required to save arbitrary objects.
*/
- def objectFile[T: ClassManifest](
+ def objectFile[T: ClassTag](
path: String,
minSplits: Int = defaultMinSplits
): RDD[T] = {
@@ -458,17 +459,17 @@ class SparkContext(
}
- protected[spark] def checkpointFile[T: ClassManifest](
+ protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
new CheckpointRDD[T](this, path)
}
/** Build the union of a list of RDDs. */
- def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
+ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
/** Build the union of a list of RDDs passed as variable-length arguments. */
- def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
+ def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)
// Methods for creating shared variables
@@ -711,7 +712,7 @@ class SparkContext(
* flag specifies whether the scheduler can run the computation on the driver rather than
* shipping it out to the cluster, for short actions like first().
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
@@ -732,7 +733,7 @@ class SparkContext(
* allowLocal flag specifies whether the scheduler can run the computation on the driver rather
* than shipping it out to the cluster, for short actions like first().
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
@@ -747,7 +748,7 @@ class SparkContext(
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
@@ -759,21 +760,21 @@ class SparkContext(
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
- def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
+ def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
- def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
+ def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit)
@@ -784,7 +785,7 @@ class SparkContext(
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: Iterator[T] => U,
resultHandler: (Int, U) => Unit)
@@ -930,16 +931,16 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
- implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
+ implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
- implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new AsyncRDDActions(rdd)
+ implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
- implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
+ implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
- implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
+ implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)
@@ -964,16 +965,16 @@ object SparkContext {
implicit def stringToText(s: String) = new Text(s)
- private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
+ private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
- new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
+ new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
}
// Helper objects for converting common types to Writable
- private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = {
- val wClass = classManifest[W].erasure.asInstanceOf[Class[W]]
+ private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = {
+ val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}
@@ -992,7 +993,7 @@ object SparkContext {
implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
implicit def writableWritableConverter[T <: Writable]() =
- new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T])
+ new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
@@ -1147,12 +1148,12 @@ object SparkContext {
/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
- * The getter for the writable class takes a ClassManifest[T] in case this is a generic object
+ * The getter for the writable class takes a ClassTag[T] in case this is a generic object
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
private[spark] class WritableConverter[T](
- val writableClass: ClassManifest[T] => Class[_ <: Writable],
+ val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ff2df8fb6a..826f5c2d8c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -20,7 +20,7 @@ package org.apache.spark
import collection.mutable
import serializer.Serializer
-import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
+import akka.actor._
import akka.remote.RemoteActorRefProvider
import org.apache.spark.broadcast.BroadcastManager
@@ -74,7 +74,8 @@ class SparkEnv (
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
- actorSystem.awaitTermination()
+ // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
+ //actorSystem.awaitTermination()
}
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
@@ -151,17 +152,17 @@ object SparkEnv extends Logging {
val closureSerializer = serializerManager.get(
System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
- def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
+ def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
if (isDriver) {
logInfo("Registering " + name)
- actorSystem.actorOf(Props(newActor), name = name)
+ Left(actorSystem.actorOf(Props(newActor), name = name))
} else {
val driverHost: String = System.getProperty("spark.driver.host", "localhost")
val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
- val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
+ val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
- actorSystem.actorFor(url)
+ Right(actorSystem.actorSelection(url))
}
}
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index 19ce8369d9..0bf1e4a5e2 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -19,8 +19,7 @@ package org.apache.spark
import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-private[spark] object TaskState
- extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED", "KILLED", "LOST") {
+private[spark] object TaskState extends Enumeration {
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 9f02a9b7d3..da30cf619a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -17,12 +17,15 @@
package org.apache.spark.api.java
+import scala.reflect.ClassTag
+
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.util.StatCounter
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.storage.StorageLevel
+
import java.lang.Double
import org.apache.spark.Partitioner
@@ -30,7 +33,7 @@ import scala.collection.JavaConverters._
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
- override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]]
+ override val classTag: ClassTag[Double] = implicitly[ClassTag[Double]]
override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
@@ -44,7 +47,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
- /**
+ /**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@@ -108,7 +111,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 2142fd7327..1167c12022 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -22,6 +22,7 @@ import java.util.Comparator
import scala.Tuple2
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
@@ -43,13 +44,13 @@ import org.apache.spark.rdd.OrderedRDDFunctions
import org.apache.spark.storage.StorageLevel
-class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
- implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K],
+ implicit val vClassTag: ClassTag[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
- override val classManifest: ClassManifest[(K, V)] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ override val classTag: ClassTag[(K, V)] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
import JavaPairRDD._
@@ -58,7 +59,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
- /**
+ /**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@@ -138,14 +139,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
override def first(): (K, V) = rdd.first()
// Pair RDD functions
-
+
/**
- * Generic function to combine the elements for each key using a custom set of aggregation
- * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
- * "combined type" C * Note that V and C can be different -- for example, one might group an
- * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
+ * Generic function to combine the elements for each key using a custom set of aggregation
+ * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
+ * "combined type" C * Note that V and C can be different -- for example, one might group an
+ * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
* functions:
- *
+ *
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
@@ -157,8 +158,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
- implicit val cm: ClassManifest[C] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
fromRDD(rdd.combineByKey(
createCombiner,
mergeValue,
@@ -195,14 +195,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())
- /**
+ /**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
- /**
+ /**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@@ -258,7 +258,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
@@ -315,15 +315,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
- /**
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
- implicit val cm: ClassManifest[C] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
}
@@ -414,8 +413,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
fromRDD(rdd.mapValues(f))
}
@@ -426,8 +424,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
fromRDD(rdd.flatMapValues(fn))
}
@@ -603,22 +600,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
}
object JavaPairRDD {
- def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassManifest[K],
- vcm: ClassManifest[T]): RDD[(K, JList[T])] =
+ def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassTag[K],
+ vcm: ClassTag[T]): RDD[(K, JList[T])] =
rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _)
- def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassManifest[K],
- vcm: ClassManifest[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd).mapValues((x: (Seq[V],
- Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
+ def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassTag[K],
+ vcm: ClassTag[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd)
+ .mapValues((x: (Seq[V], Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1],
- Seq[W2]))])(implicit kcm: ClassManifest[K]) : RDD[(K, (JList[V], JList[W1],
+ Seq[W2]))])(implicit kcm: ClassTag[K]) : RDD[(K, (JList[V], JList[W1],
JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues(
(x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1),
seqAsJavaList(x._2),
seqAsJavaList(x._3)))
- def fromRDD[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
+ def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd)
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
@@ -626,10 +623,8 @@ object JavaPairRDD {
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmk: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val cmv: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
new JavaPairRDD[K, V](rdd.rdd)
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 3b359a8fd6..c47657f512 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -17,12 +17,14 @@
package org.apache.spark.api.java
+import scala.reflect.ClassTag
+
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
-class JavaRDD[T](val rdd: RDD[T])(implicit val classManifest: ClassManifest[T]) extends
+class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
JavaRDDLike[T, JavaRDD[T]] {
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
@@ -127,8 +129,7 @@ JavaRDDLike[T, JavaRDD[T]] {
object JavaRDD {
- implicit def fromRDD[T: ClassManifest](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
+ implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}
-
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 7a3568c5ef..9e912d3adb 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -20,6 +20,7 @@ package org.apache.spark.api.java
import java.util.{List => JList, Comparator}
import scala.Tuple2
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
@@ -35,7 +36,7 @@ import org.apache.spark.storage.StorageLevel
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
- implicit val classManifest: ClassManifest[T]
+ implicit val classTag: ClassTag[T]
def rdd: RDD[T]
@@ -71,7 +72,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- def mapPartitionsWithIndex[R: ClassManifest](
+ def mapPartitionsWithIndex[R: ClassTag](
f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
@@ -87,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -118,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -158,18 +159,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* elements (a, b) where a is in `this` and b is in `other`.
*/
def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] =
- JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest,
- other.classManifest)
+ JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classTag))(classTag, other.classTag)
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
- implicit val kcm: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val vcm: ClassManifest[JList[T]] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
+ implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val vcm: ClassTag[JList[T]] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
}
@@ -178,10 +177,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
- implicit val kcm: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val vcm: ClassManifest[JList[T]] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
+ implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val vcm: ClassTag[JList[T]] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm)
}
@@ -209,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* a map on the other).
*/
def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
- JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest)
+ JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, other.classTag)
}
/**
@@ -224,7 +222,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
- rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType())
+ rdd.zipPartitions(other.rdd)(fn)(other.classTag, f.elementType()))(f.elementType())
}
// Actions (launch a job to return a value to the user program)
@@ -356,7 +354,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
- implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
JavaPairRDD.fromRDD(rdd.keyBy(f))
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 8869e072bf..acf328aa6a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -21,6 +21,7 @@ import java.util.{Map => JMap}
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
@@ -82,8 +83,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
}
@@ -94,10 +94,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Distribute a local Scala collection to form an RDD. */
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
: JavaPairRDD[K, V] = {
- implicit val kcm: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val vcm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val vcm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
}
@@ -132,16 +130,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
}
/**Get an RDD for a Hadoop SequenceFile. */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
}
@@ -153,8 +151,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.objectFile(path, minSplits)(cm)
}
@@ -166,8 +163,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.objectFile(path)(cm)
}
@@ -183,8 +179,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
}
@@ -199,8 +195,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
}
@@ -212,8 +208,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
}
@@ -224,8 +220,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path,
inputFormatClass, keyClass, valueClass))
}
@@ -240,8 +236,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
kClass: Class[K],
vClass: Class[V],
conf: Configuration): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(kClass)
- implicit val vcm = ClassManifest.fromClass(vClass)
+ implicit val kcm: ClassTag[K] = ClassTag(kClass)
+ implicit val vcm: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
}
@@ -254,15 +250,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(kClass)
- implicit val vcm = ClassManifest.fromClass(vClass)
+ implicit val kcm: ClassTag[K] = ClassTag(kClass)
+ implicit val vcm: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
/** Build the union of two or more RDDs. */
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
- implicit val cm: ClassManifest[T] = first.classManifest
+ implicit val cm: ClassTag[T] = first.classTag
sc.union(rdds)(cm)
}
@@ -270,9 +266,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
: JavaPairRDD[K, V] = {
val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
- implicit val cm: ClassManifest[(K, V)] = first.classManifest
- implicit val kcm: ClassManifest[K] = first.kManifest
- implicit val vcm: ClassManifest[V] = first.vManifest
+ implicit val cm: ClassTag[(K, V)] = first.classTag
+ implicit val kcm: ClassTag[K] = first.kClassTag
+ implicit val vcm: ClassTag[V] = first.vClassTag
new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
}
@@ -405,8 +401,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
}
protected def checkpointFile[T](path: String): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new JavaRDD(sc.checkpointFile(path))
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
index c9cbce5624..2090efd3b9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
@@ -17,7 +17,6 @@
package org.apache.spark.api.java;
-import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
index 2dfda8b09a..bdb01f7670 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
@@ -17,9 +17,11 @@
package org.apache.spark.api.java.function
+import scala.reflect.ClassTag
+
/**
* A function that returns zero or more output records from each input record.
*/
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
- def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
+ def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
index 528e1c0a7c..aae1349c5e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
@@ -17,9 +17,11 @@
package org.apache.spark.api.java.function
+import scala.reflect.ClassTag
+
/**
* A function that takes two inputs and returns zero or more output records.
*/
abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
- def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]]
+ def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
index ce368ee01b..537439ef53 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
@@ -17,8 +17,8 @@
package org.apache.spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import java.io.Serializable;
@@ -29,8 +29,8 @@ import java.io.Serializable;
* when mapping RDDs of other types.
*/
public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
index 44ad559d48..a2d1214fb4 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
@@ -17,8 +17,8 @@
package org.apache.spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import java.io.Serializable;
@@ -28,8 +28,8 @@ import java.io.Serializable;
public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
implements Serializable {
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
index ac6178924a..fb1deceab5 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
@@ -17,8 +17,8 @@
package org.apache.spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction2;
import java.io.Serializable;
@@ -29,8 +29,8 @@ import java.io.Serializable;
public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
implements Serializable {
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 6d76a8f970..ca485b3cc2 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -18,8 +18,8 @@
package org.apache.spark.api.java.function;
import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import java.io.Serializable;
@@ -33,11 +33,11 @@ public abstract class PairFlatMapFunction<T, K, V>
extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
implements Serializable {
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<K> keyType() {
+ return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
}
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<V> valueType() {
+ return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
index ede7ceefb5..cbe2306026 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
@@ -18,8 +18,8 @@
package org.apache.spark.api.java.function;
import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import java.io.Serializable;
@@ -31,11 +31,11 @@ import java.io.Serializable;
public abstract class PairFunction<T, K, V> extends WrappedFunction1<T, Tuple2<K, V>>
implements Serializable {
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<K> keyType() {
+ return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
}
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<V> valueType() {
+ return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 132e4fb0d2..a659cc06c2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -22,6 +22,7 @@ import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
@@ -29,8 +30,7 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
-
-private[spark] class PythonRDD[T: ClassManifest](
+private[spark] class PythonRDD[T: ClassTag](
parent: RDD[T],
command: Array[Byte],
envVars: JMap[String, String],
@@ -148,7 +148,7 @@ private[spark] class PythonRDD[T: ClassManifest](
case eof: EOFException => {
throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
}
- case e => throw e
+ case e: Throwable => throw e
}
}
@@ -200,7 +200,7 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
- case e => throw e
+ case e: Throwable => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
@@ -236,7 +236,7 @@ private[spark] object PythonRDD {
}
def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
- implicit val cm : ClassManifest[T] = rdd.elementClassManifest
+ implicit val cm : ClassTag[T] = rdd.elementClassTag
rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 67d45723ba..f291266fcf 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -64,7 +64,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
startDaemon()
new Socket(daemonHost, daemonPort)
}
- case e => throw e
+ case e: Throwable => throw e
}
}
}
@@ -198,7 +198,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}.start()
} catch {
- case e => {
+ case e: Throwable => {
stopDaemon()
throw e
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index fcfea96ad6..37dfa7fec0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy
-private[spark] object ExecutorState
- extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
+private[spark] object ExecutorState extends Enumeration {
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index a724900943..59d12a3e6f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -34,11 +34,11 @@ import scala.collection.mutable.ArrayBuffer
*/
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
-
+
private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
-
+
def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
@@ -61,10 +61,13 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
def stop() {
logInfo("Shutting down local Spark cluster.")
// Stop the workers before the master so they don't get upset that it disconnected
+ // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
+ // This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
- workerActorSystems.foreach(_.awaitTermination())
-
+ //workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
- masterActorSystems.foreach(_.awaitTermination())
+ //masterActorSystems.foreach(_.awaitTermination())
+ masterActorSystems.clear()
+ workerActorSystems.clear()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 77422f61ec..4d95efa73a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -19,17 +19,15 @@ package org.apache.spark.deploy.client
import java.util.concurrent.TimeoutException
+import scala.concurrent.duration._
+import scala.concurrent.Await
+
import akka.actor._
-import akka.actor.Terminated
+import akka.pattern.AskTimeoutException
import akka.pattern.ask
-import akka.util.Duration
-import akka.util.duration._
-import akka.remote.RemoteClientDisconnected
-import akka.remote.RemoteClientLifeCycleEvent
-import akka.remote.RemoteClientShutdown
-import akka.dispatch.Await
-
-import org.apache.spark.Logging
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
+
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -51,18 +49,19 @@ private[spark] class Client(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
+ var masterAddress: Address = null
var actor: ActorRef = null
var appId: String = null
var registered = false
var activeMasterUrl: String = null
class ClientActor extends Actor with Logging {
- var master: ActorRef = null
- var masterAddress: Address = null
+ var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
override def preStart() {
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try {
registerWithMaster()
} catch {
@@ -76,7 +75,7 @@ private[spark] class Client(
def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
- val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+ val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterApplication(appDescription)
}
}
@@ -84,6 +83,7 @@ private[spark] class Client(
def registerWithMaster() {
tryRegisterAllMasters()
+ import context.dispatcher
var retries = 0
lazy val retryTimer: Cancellable =
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
@@ -102,10 +102,13 @@ private[spark] class Client(
def changeMaster(url: String) {
activeMasterUrl = url
- master = context.actorFor(Master.toAkkaUrl(url))
- masterAddress = master.path.address
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ masterAddress = activeMasterUrl match {
+ case Master.sparkUrlRegex(host, port) =>
+ Address("akka.tcp", Master.systemName, host, port.toInt)
+ case x =>
+ throw new SparkException("Invalid spark URL: " + x)
+ }
}
override def receive = {
@@ -135,21 +138,12 @@ private[spark] class Client(
case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
- context.unwatch(master)
changeMaster(masterUrl)
alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId)
- case Terminated(actor_) if actor_ == master =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
- markDisconnected()
-
- case RemoteClientDisconnected(transport, address) if address == masterAddress =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
- markDisconnected()
-
- case RemoteClientShutdown(transport, address) if address == masterAddress =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
+ case DisassociatedEvent(_, address, _) if address == masterAddress =>
+ logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
case StopClient =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
index fedf879eff..67e6c5d66a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object ApplicationState
- extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED", "UNKNOWN") {
+private[spark] object ApplicationState extends Enumeration {
type ApplicationState = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index c0849ef324..043945a211 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -65,7 +65,7 @@ private[spark] class FileSystemPersistenceEngine(
(apps, workers)
}
- private def serializeIntoFile(file: File, value: Serializable) {
+ private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
@@ -77,13 +77,13 @@ private[spark] class FileSystemPersistenceEngine(
out.close()
}
- def deserializeFromFile[T <: Serializable](file: File)(implicit m: Manifest[T]): T = {
+ def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
dis.readFully(fileData)
dis.close()
- val clazz = m.erasure.asInstanceOf[Class[T]]
+ val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index cd916672ac..c627dd3806 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,19 +17,20 @@
package org.apache.spark.deploy.master
-import java.util.Date
import java.text.SimpleDateFormat
+import java.util.concurrent.TimeUnit
+import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
-import akka.actor.Terminated
-import akka.dispatch.Await
import akka.pattern.ask
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.remote._
import akka.serialization.SerializationExtension
-import akka.util.duration._
-import akka.util.{Duration, Timeout}
+import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -37,9 +38,11 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{Utils, AkkaUtils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
+ import context.dispatcher
+
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
@@ -93,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@@ -113,13 +116,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
new BlackHolePersistenceEngine()
}
- leaderElectionAgent = context.actorOf(Props(
- RECOVERY_MODE match {
+ leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
- new ZooKeeperLeaderElectionAgent(self, masterUrl)
+ context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
case _ =>
- new MonarchyLeaderAgent(self)
- }))
+ context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
+ }
}
override def preRestart(reason: Throwable, message: Option[Any]) {
@@ -142,9 +144,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
RecoveryState.ALIVE
else
RecoveryState.RECOVERING
-
logInfo("I have been elected leader! New state: " + state)
-
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
@@ -156,7 +156,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
System.exit(0)
}
- case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
+ case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -164,9 +164,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+ val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
+ sender, workerWebUiPort, publicAddress)
registerWorker(worker)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
@@ -181,7 +181,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
@@ -257,23 +256,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (canCompleteRecovery) { completeRecovery() }
}
- case Terminated(actor) => {
- // The disconnected actor could've been either a worker or an app; remove whichever of
- // those we have an entry for in the corresponding actor hashmap
- actorToWorker.get(actor).foreach(removeWorker)
- actorToApp.get(actor).foreach(finishApplication)
- if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
- }
-
- case RemoteClientDisconnected(transport, address) => {
- // The disconnected client could've been either a worker or an app; remove whichever it was
- addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(finishApplication)
- if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
- }
-
- case RemoteClientShutdown(transport, address) => {
+ case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
+ logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
@@ -530,9 +515,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
private[spark] object Master {
- private val systemName = "sparkMaster"
+ val systemName = "sparkMaster"
private val actorName = "Master"
- private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+ val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
@@ -540,11 +525,11 @@ private[spark] object Master {
actorSystem.awaitTermination()
}
- /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+ /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
def toAkkaUrl(sparkUrl: String): String = {
sparkUrl match {
case sparkUrlRegex(host, port) =>
- "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
case _ =>
throw new SparkException("Invalid master URL: " + sparkUrl)
}
@@ -552,9 +537,9 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
- val timeoutDuration = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
+ val timeoutDuration: FiniteDuration = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
implicit val timeout = Timeout(timeoutDuration)
val respFuture = actor ? RequestWebUIPort // ask pattern
val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
index b91be821f0..256a5a7c28 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
@@ -17,9 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object RecoveryState
- extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
-
+private[spark] object RecoveryState extends Enumeration {
type MasterState = Value
val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
index c8d34f25e2..0b36ef6005 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@ -17,9 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object WorkerState
- extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED", "UNKNOWN") {
-
+private[spark] object WorkerState extends Enumeration {
type WorkerState = Value
val ALIVE, DEAD, DECOMMISSIONED, UNKNOWN = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index a0233a7271..825344b3bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -70,15 +70,15 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
(apps, workers)
}
- private def serializeIntoFile(path: String, value: Serializable) {
+ private def serializeIntoFile(path: String, value: AnyRef) {
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
zk.create(path, serialized, CreateMode.PERSISTENT)
}
- def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = {
+ def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
val fileData = zk.getData("/spark/master_status/" + filename)
- val clazz = m.erasure.asInstanceOf[Class[T]]
+ val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index f4e574d15d..3b983c19eb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -19,9 +19,10 @@ package org.apache.spark.deploy.master.ui
import scala.xml.Node
-import akka.dispatch.Await
import akka.pattern.ask
-import akka.util.duration._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index d7a57229b0..65e7a14e7a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import akka.dispatch.Await
+import scala.concurrent.Await
import akka.pattern.ask
-import akka.util.duration._
+import scala.concurrent.duration._
import net.liftweb.json.JsonAST.JValue
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index f4df729e87..a211ce2b42 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.master.ui
-import akka.util.Duration
+import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 216d9d44ac..87531b6719 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -17,23 +17,31 @@
package org.apache.spark.deploy.worker
+import java.io.File
import java.text.SimpleDateFormat
import java.util.Date
-import java.io.File
import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
import akka.actor._
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import akka.util.duration._
+import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
+import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
+import org.apache.spark.deploy.DeployMessages.KillExecutor
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.deploy.DeployMessages.Heartbeat
+import org.apache.spark.deploy.DeployMessages.RegisteredWorker
+import org.apache.spark.deploy.DeployMessages.LaunchExecutor
+import org.apache.spark.deploy.DeployMessages.RegisterWorker
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -47,6 +55,7 @@ private[spark] class Worker(
masterUrls: Array[String],
workDirPath: String = null)
extends Actor with Logging {
+ import context.dispatcher
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
@@ -63,7 +72,8 @@ private[spark] class Worker(
var masterIndex = 0
val masterLock: Object = new Object()
- var master: ActorRef = null
+ var master: ActorSelection = null
+ var masterAddress: Address = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
@volatile var registered = false
@@ -114,7 +124,7 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
-
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
registerWithMaster()
@@ -126,9 +136,13 @@ private[spark] class Worker(
masterLock.synchronized {
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
- master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ masterAddress = activeMasterUrl match {
+ case Master.sparkUrlRegex(_host, _port) =>
+ Address("akka.tcp", Master.systemName, _host, _port.toInt)
+ case x =>
+ throw new SparkException("Invalid spark URL: " + x)
+ }
connected = true
}
}
@@ -136,7 +150,7 @@ private[spark] class Worker(
def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
- val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+ val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get,
publicAddress)
}
@@ -175,7 +189,6 @@ private[spark] class Worker(
case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
- context.unwatch(master)
changeMaster(masterUrl, masterWebUiUrl)
val execs = executors.values.
@@ -234,13 +247,8 @@ private[spark] class Worker(
}
}
- case Terminated(actor_) if actor_ == master =>
- masterDisconnected()
-
- case RemoteClientDisconnected(transport, address) if address == master.path.address =>
- masterDisconnected()
-
- case RemoteClientShutdown(transport, address) if address == master.path.address =>
+ case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
+ logInfo(s"$x Disassociated !")
masterDisconnected()
case RequestWorkerState => {
@@ -280,8 +288,8 @@ private[spark] object Worker {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir)), name = "Worker")
+ actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
+ masterUrls, workDir), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index d2d3617498..1a768d501f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -21,9 +21,10 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import akka.dispatch.Await
+import scala.concurrent.duration._
+import scala.concurrent.Await
+
import akka.pattern.ask
-import akka.util.duration._
import net.liftweb.json.JsonAST.JValue
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 800f1cafcc..6c18a3c245 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -17,20 +17,19 @@
package org.apache.spark.deploy.worker.ui
-import akka.util.{Duration, Timeout}
+import java.io.File
-import java.io.{FileInputStream, File}
+import scala.concurrent.duration._
+import akka.util.Timeout
import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.{Handler, Server}
-
+import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.{Logging}
-import org.apache.spark.ui.JettyUtils
+import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
+import org.eclipse.jetty.server.{Handler, Server}
/**
* Web UI server for the standalone worker.
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8332631838..debbdd4c44 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -19,15 +19,14 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
-import akka.actor.{ActorRef, Actor, Props, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.actor._
+import akka.remote._
import org.apache.spark.Logging
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
-
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
@@ -40,14 +39,13 @@ private[spark] class CoarseGrainedExecutorBackend(
Utils.checkHostPort(hostPort, "Expected hostport")
var executor: Executor = null
- var driver: ActorRef = null
+ var driver: ActorSelection = null
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
+ driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receive = {
@@ -77,8 +75,8 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.killTask(taskId)
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
- logError("Driver terminated or disconnected! Shutting down.")
+ case x: DisassociatedEvent =>
+ logError(s"Driver $x disassociated! Shutting down.")
System.exit(1)
case StopExecutor =>
@@ -99,12 +97,13 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
+ indestructible = true)
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
- val actor = actorSystem.actorOf(
- Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
+ actorSystem.actorOf(
+ Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5c9bb9db1c..0b0a60ee60 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -121,7 +121,7 @@ private[spark] class Executor(
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = {
- env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
+ env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
}
// Start worker thread pool
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 9c2fee4023..703bc6a9ca 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -31,11 +31,11 @@ import scala.collection.mutable.SynchronizedMap
import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable.ArrayBuffer
-import akka.dispatch.{Await, Promise, ExecutionContext, Future}
-import akka.util.Duration
-import akka.util.duration._
-import org.apache.spark.util.Utils
+import scala.concurrent.{Await, Promise, ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
+import org.apache.spark.util.Utils
private[spark] class ConnectionManager(port: Int) extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 8d9ad9604d..4f5742d29b 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -25,8 +25,8 @@ import scala.io.Source
import java.nio.ByteBuffer
import java.net.InetAddress
-import akka.dispatch.Await
-import akka.util.duration._
+import scala.concurrent.Await
+import scala.concurrent.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index faaf837be0..d1c74a5063 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
@@ -28,7 +29,7 @@ import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
* A set of asynchronous RDD actions available through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
-class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with Logging {
+class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
/**
* Returns a future for counting the number of elements in the RDD.
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 44ea573a7c..424354ae16 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
import org.apache.spark.storage.{BlockId, BlockManager}
@@ -25,7 +27,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P
}
private[spark]
-class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 0de22f0e06..87b950ba43 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -18,6 +18,7 @@
package org.apache.spark.rdd
import java.io.{ObjectOutputStream, IOException}
+import scala.reflect.ClassTag
import org.apache.spark._
@@ -43,7 +44,7 @@ class CartesianPartition(
}
private[spark]
-class CartesianRDD[T: ClassManifest, U:ClassManifest](
+class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index d3033ea4a6..a712ef1c27 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -17,15 +17,13 @@
package org.apache.spark.rdd
+import java.io.IOException
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{NullWritable, BytesWritable}
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.fs.Path
-import java.io.{File, IOException, EOFException}
-import java.text.NumberFormat
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -33,7 +31,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
* This RDD represents a RDD checkpoint file (similar to HadoopRDD).
*/
private[spark]
-class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
+class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c5de6362a9..98da35763b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable
import scala.Some
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
/**
* Class that captures a coalesced RDD by essentially keeping track of parent partitions
@@ -68,7 +69,7 @@ case class CoalescedRDDPartition(
* @param maxPartitions number of desired partitions in the coalesced RDD
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
-class CoalescedRDD[T: ClassManifest](
+class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 02d75eccc5..688c310ee9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -90,12 +90,13 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
// Compute the minimum and the maxium
val (max: Double, min: Double) = self.mapPartitions { items =>
- Iterator(items.foldRight(-1/0.0, Double.NaN)((e: Double, x: Pair[Double, Double]) =>
+ Iterator(items.foldRight(Double.NegativeInfinity,
+ Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
(x._1.max(e), x._2.min(e))))
}.reduce { (maxmin1, maxmin2) =>
(maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
}
- if (max.isNaN() || max.isInfinity || min.isInfinity ) {
+ if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity ) {
throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index c8900d1a93..a84e5f9fd8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -17,13 +17,14 @@
package org.apache.spark.rdd
-import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
+import scala.reflect.ClassTag
+import org.apache.spark.{Partition, SparkContext, TaskContext}
/**
* An RDD that is empty, i.e. has no element in it.
*/
-class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
index 5312dc0b59..e74c83b90b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
@@ -18,8 +18,9 @@
package org.apache.spark.rdd
import org.apache.spark.{OneToOneDependency, Partition, TaskContext}
+import scala.reflect.ClassTag
-private[spark] class FilteredRDD[T: ClassManifest](
+private[spark] class FilteredRDD[T: ClassTag](
prev: RDD[T],
f: T => Boolean)
extends RDD[T](prev) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
index cbdf6d84c0..4d1878fc14 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
@@ -18,10 +18,11 @@
package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
private[spark]
-class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+class FlatMappedRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: T => TraversableOnce[U])
extends RDD[U](prev) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
index 829545d7b0..1a694475f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
@@ -18,8 +18,9 @@
package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
-private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
+private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T])
extends RDD[Array[T]](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index aca0146884..8df8718f3b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -19,6 +19,8 @@ package org.apache.spark.rdd
import java.sql.{Connection, ResultSet}
+import scala.reflect.ClassTag
+
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.util.NextIterator
@@ -45,7 +47,7 @@ private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) e
* This should only call getInt, getString, etc; the RDD takes care of calling next.
* The default maps a ResultSet to an array of Object.
*/
-class JdbcRDD[T: ClassManifest](
+class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index ae70d55951..db15baf503 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -18,9 +18,9 @@
package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
-
-private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
+private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
index e8be1c4816..8d7c288593 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
@@ -17,10 +17,12 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.spark.{Partition, TaskContext}
private[spark]
-class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
+class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 697be8b997..d5691f2267 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -17,7 +17,9 @@
package org.apache.spark.rdd
-import org.apache.spark.{RangePartitioner, Logging}
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, RangePartitioner}
/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
@@ -25,9 +27,9 @@ import org.apache.spark.{RangePartitioner, Logging}
* use these functions. They will work with any key type that has a `scala.math.Ordered`
* implementation.
*/
-class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
- V: ClassManifest,
- P <: Product2[K, V] : ClassManifest](
+class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
+ V: ClassTag,
+ P <: Product2[K, V] : ClassTag](
self: RDD[P])
extends Logging with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 93b78e1232..48168e152e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -25,6 +25,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
+import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.mapred._
import org.apache.hadoop.io.compress.CompressionCodec
@@ -50,7 +51,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
+class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
extends Logging
with SparkHadoopMapReduceUtil
with Serializable {
@@ -415,7 +416,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
- val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
prfs.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
@@ -431,7 +432,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
- val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
prfs.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
}
@@ -488,15 +489,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
subtractByKey(other, new HashPartitioner(numPartitions))
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
new SubtractedRDD[K, V, W](self, other, p)
/**
@@ -525,8 +526,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
- def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
+ saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -535,16 +536,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* supplied codec.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
- path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
+ path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
+ saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec)
}
/**
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
- def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
- saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
+ saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -698,11 +699,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
*/
def values: RDD[V] = self.map(_._2)
- private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
+ private[spark] def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
- private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
+ private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass
}
-private[spark] object Manifests {
- val seqSeqManifest = classManifest[Seq[Seq[_]]]
+private[spark] object ClassTags {
+ val seqSeqClassTag = classTag[Seq[Seq[_]]]
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index cd96250389..09d0a8189d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -20,13 +20,15 @@ package org.apache.spark.rdd
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
+import scala.reflect.ClassTag
+
import org.apache.spark._
import java.io._
import scala.Serializable
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils
-private[spark] class ParallelCollectionPartition[T: ClassManifest](
+private[spark] class ParallelCollectionPartition[T: ClassTag](
var rddId: Long,
var slice: Int,
var values: Seq[T])
@@ -78,7 +80,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
}
}
-private[spark] class ParallelCollectionRDD[T: ClassManifest](
+private[spark] class ParallelCollectionRDD[T: ClassTag](
@transient sc: SparkContext,
@transient data: Seq[T],
numSlices: Int,
@@ -109,7 +111,7 @@ private object ParallelCollectionRDD {
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers.
*/
- def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
+ def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of slices required")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index 574dd4233f..ea8885b36e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.spark.{NarrowDependency, SparkEnv, Partition, TaskContext}
@@ -49,7 +51,7 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
* and the execution DAG has a filter on the key, we can avoid launching tasks
* on partitions that don't have the range covering the key.
*/
-class PartitionPruningRDD[T: ClassManifest](
+class PartitionPruningRDD[T: ClassTag](
@transient prev: RDD[T],
@transient partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
@@ -69,6 +71,6 @@ object PartitionPruningRDD {
* when its type T is not known at compile time.
*/
def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) = {
- new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassManifest)
+ new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassTag)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index d5304ab0ae..1dbbe39898 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -24,6 +24,7 @@ import scala.collection.Map
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
+import scala.reflect.ClassTag
import org.apache.spark.{SparkEnv, Partition, TaskContext}
import org.apache.spark.broadcast.Broadcast
@@ -33,7 +34,7 @@ import org.apache.spark.broadcast.Broadcast
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
-class PipedRDD[T: ClassManifest](
+class PipedRDD[T: ClassTag](
prev: RDD[T],
command: Seq[String],
envVars: Map[String, String],
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 893708f8f2..ea45566ad1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -23,6 +23,9 @@ import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.reflect.{classTag, ClassTag}
+
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
@@ -69,7 +72,7 @@ import org.apache.spark._
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
* on RDD internals.
*/
-abstract class RDD[T: ClassManifest](
+abstract class RDD[T: ClassTag](
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
@@ -243,13 +246,13 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
- def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+ def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
+ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
/**
@@ -374,25 +377,25 @@ abstract class RDD[T: ClassManifest](
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
* elements (a, b) where a is in `this` and b is in `other`.
*/
- def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
+ def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
groupBy[K](f, defaultPartitioner(this))
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
groupBy(f, new HashPartitioner(numPartitions))
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+ def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
@@ -439,7 +442,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U: ClassManifest](
+ def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
@@ -449,7 +452,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- def mapPartitionsWithIndex[U: ClassManifest](
+ def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
@@ -459,7 +462,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD. This is a variant of
* mapPartitions that also passes the TaskContext into the closure.
*/
- def mapPartitionsWithContext[U: ClassManifest](
+ def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
@@ -471,7 +474,7 @@ abstract class RDD[T: ClassManifest](
* of the original partition.
*/
@deprecated("use mapPartitionsWithIndex", "0.7.0")
- def mapPartitionsWithSplit[U: ClassManifest](
+ def mapPartitionsWithSplit[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
mapPartitionsWithIndex(f, preservesPartitioning)
}
@@ -481,7 +484,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def mapWith[A: ClassManifest, U: ClassManifest]
+ def mapWith[A: ClassTag, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => U): RDD[U] = {
mapPartitionsWithIndex((index, iter) => {
@@ -495,7 +498,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def flatMapWith[A: ClassManifest, U: ClassManifest]
+ def flatMapWith[A: ClassTag, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => Seq[U]): RDD[U] = {
mapPartitionsWithIndex((index, iter) => {
@@ -509,7 +512,7 @@ abstract class RDD[T: ClassManifest](
* This additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def foreachWith[A: ClassManifest](constructA: Int => A)(f: (T, A) => Unit) {
+ def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) {
mapPartitionsWithIndex { (index, iter) =>
val a = constructA(index)
iter.map(t => {f(t, a); t})
@@ -521,7 +524,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def filterWith[A: ClassManifest](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
+ def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.filter(t => p(t, a))
@@ -534,7 +537,7 @@ abstract class RDD[T: ClassManifest](
* partitions* and the *same number of elements in each partition* (e.g. one was made through
* a map on the other).
*/
- def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
+ def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
/**
* Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
@@ -542,32 +545,27 @@ abstract class RDD[T: ClassManifest](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
- def zipPartitions[B: ClassManifest, V: ClassManifest]
- (rdd2: RDD[B], preservesPartitioning: Boolean)
- (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
-
- def zipPartitions[B: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
@@ -605,7 +603,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD that contains all matching values by applying `f`.
*/
- def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
+ def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = {
filter(f.isDefinedAt).map(f)
}
@@ -695,7 +693,7 @@ abstract class RDD[T: ClassManifest](
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*/
- def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
+ def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
@@ -744,7 +742,7 @@ abstract class RDD[T: ClassManifest](
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): Map[T, Long] = {
- if (elementClassManifest.erasure.isArray) {
+ if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
@@ -775,7 +773,7 @@ abstract class RDD[T: ClassManifest](
timeout: Long,
confidence: Double = 0.95
): PartialResult[Map[T, BoundedDouble]] = {
- if (elementClassManifest.erasure.isArray) {
+ if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
@@ -942,12 +940,12 @@ abstract class RDD[T: ClassManifest](
/** Record user function generating this RDD. */
@transient private[spark] val origin = Utils.formatSparkCallSite
- private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
+ private[spark] def elementClassTag: ClassTag[T] = classTag[T]
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
/** Returns the first parent RDD */
- protected[spark] def firstParent[U: ClassManifest] = {
+ protected[spark] def firstParent[U: ClassTag] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
@@ -1009,7 +1007,7 @@ abstract class RDD[T: ClassManifest](
origin)
def toJavaRDD() : JavaRDD[T] = {
- new JavaRDD(this)(elementClassManifest)
+ new JavaRDD(this)(elementClassTag)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 6009a41570..3b56e45aa9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -17,6 +17,8 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
@@ -38,7 +40,7 @@ private[spark] object CheckpointState extends Enumeration {
* manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
* of the checkpointed RDD.
*/
-private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
+private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
extends Logging with Serializable {
import CheckpointState._
diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
index 2c5253ae30..d433670cc2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
@@ -17,6 +17,7 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
import java.util.Random
import cern.jet.random.Poisson
@@ -29,9 +30,9 @@ class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition
override val index: Int = prev.index
}
-class SampledRDD[T: ClassManifest](
+class SampledRDD[T: ClassTag](
prev: RDD[T],
- withReplacement: Boolean,
+ withReplacement: Boolean,
frac: Double,
seed: Int)
extends RDD[T](prev) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 5fe4676029..2d1bd5b481 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.spark.rdd
+import scala.reflect.{ ClassTag, classTag}
+
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.io.compress.CompressionCodec
@@ -32,15 +33,15 @@ import org.apache.spark.Logging
*
* Import `org.apache.spark.SparkContext._` at the top of their program to use these functions.
*/
-class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
+class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
self: RDD[(K, V)])
extends Logging
with Serializable {
- private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
+ private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
- classManifest[T].erasure
+ if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
+ classTag[T].runtimeClass
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index a5d751a7bd..3682c84598 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -17,8 +17,10 @@
package org.apache.spark.rdd
-import org.apache.spark.{Dependency, Partitioner, SparkEnv, ShuffleDependency, Partition, TaskContext}
+import scala.reflect.ClassTag
+import org.apache.spark.{Dependency, Partition, Partitioner, ShuffleDependency,
+ SparkEnv, TaskContext}
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
override val index = idx
@@ -32,7 +34,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam K the key class.
* @tparam V the value class.
*/
-class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
+class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
@transient var prev: RDD[P],
part: Partitioner)
extends RDD[P](prev.context, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index 7af4d803e7..aab30b1bb4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -18,8 +18,11 @@
package org.apache.spark.rdd
import java.util.{HashMap => JHashMap}
+
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
import org.apache.spark.Partitioner
import org.apache.spark.Dependency
import org.apache.spark.TaskContext
@@ -45,7 +48,7 @@ import org.apache.spark.OneToOneDependency
* you can use `rdd1`'s partitioner/partition size and not worry about running
* out of memory because of the size of `rdd2`.
*/
-private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
+private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
@transient var rdd1: RDD[_ <: Product2[K, V]],
@transient var rdd2: RDD[_ <: Product2[K, W]],
part: Partitioner)
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index ae8a9f36a6..08a41ac558 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -18,10 +18,13 @@
package org.apache.spark.rdd
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
import org.apache.spark.{Dependency, RangeDependency, SparkContext, Partition, TaskContext}
+
import java.io.{ObjectOutputStream, IOException}
-private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int)
+private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitIndex: Int)
extends Partition {
var split: Partition = rdd.partitions(splitIndex)
@@ -40,7 +43,7 @@ private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], spl
}
}
-class UnionRDD[T: ClassManifest](
+class UnionRDD[T: ClassTag](
sc: SparkContext,
@transient var rdds: Seq[RDD[T]])
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index a97d2a01c8..83be3c6eb4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import org.apache.spark.{OneToOneDependency, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
+import scala.reflect.ClassTag
private[spark] class ZippedPartitionsPartition(
idx: Int,
@@ -38,7 +39,7 @@ private[spark] class ZippedPartitionsPartition(
}
}
-abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
+abstract class ZippedPartitionsBaseRDD[V: ClassTag](
sc: SparkContext,
var rdds: Seq[RDD[_]],
preservesPartitioning: Boolean = false)
@@ -71,7 +72,7 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
}
}
-class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest](
+class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
@@ -92,7 +93,7 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]
}
class ZippedPartitionsRDD3
- [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
@@ -117,7 +118,7 @@ class ZippedPartitionsRDD3
}
class ZippedPartitionsRDD4
- [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
+ [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
var rdd1: RDD[A],
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
index 567b67dfee..fb5b070c18 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
@@ -18,10 +18,12 @@
package org.apache.spark.rdd
import org.apache.spark.{OneToOneDependency, SparkContext, Partition, TaskContext}
+
import java.io.{ObjectOutputStream, IOException}
+import scala.reflect.ClassTag
-private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
+private[spark] class ZippedPartition[T: ClassTag, U: ClassTag](
idx: Int,
@transient rdd1: RDD[T],
@transient rdd2: RDD[U]
@@ -42,7 +44,7 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
}
}
-class ZippedRDD[T: ClassManifest, U: ClassManifest](
+class ZippedRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1: RDD[T],
var rdd2: RDD[U])
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f9cd021dd3..963d15b76d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -21,9 +21,11 @@ import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor._
-import akka.util.duration._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+import akka.actor._
import org.apache.spark._
import org.apache.spark.rdd.RDD
@@ -104,7 +106,7 @@ class DAGScheduler(
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
- val RESUBMIT_TIMEOUT = 50L
+ val RESUBMIT_TIMEOUT = 50.milliseconds
// The time, in millis, to wake up between polls of the completion queue in order to potentially
// resubmit failed stages
@@ -177,13 +179,14 @@ class DAGScheduler(
var resubmissionTask: Cancellable = _
override def preStart() {
+ import context.dispatcher
/**
* A message is sent to the actor itself periodically to remind the actor to resubmit failed
* stages. In this way, stage resubmission can be done within the same thread context of
* other event processing logic to avoid unnecessary synchronization overhead.
*/
resubmissionTask = context.system.scheduler.schedule(
- RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
+ RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
}
/**
@@ -460,7 +463,7 @@ class DAGScheduler(
waiter
}
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
index 0a786deb16..3832ee7ff6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
@@ -22,7 +22,7 @@ package org.apache.spark.scheduler
* to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
-object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
+object SchedulingMode extends Enumeration {
type SchedulingMode = Value
val FAIR,FIFO,NONE = Value
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
index 47b0f387aa..35de13c385 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -18,9 +18,7 @@
package org.apache.spark.scheduler
-private[spark] object TaskLocality
- extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY")
-{
+private[spark] object TaskLocality extends Enumeration {
// process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 4d82430b97..66ab8ea4cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -24,8 +24,7 @@ import java.util.{TimerTask, Timer}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
-
-import akka.util.duration._
+import scala.concurrent.duration._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
@@ -122,7 +121,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (System.getProperty("spark.speculation", "false").toBoolean) {
logInfo("Starting speculative execution thread")
-
+ import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
checkSpeculatableTasks()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index d0ba5bf55d..f5e8766f6d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -20,13 +20,12 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
-import akka.dispatch.Await
import akka.pattern.ask
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
-import akka.util.Duration
-import akka.util.duration._
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.TaskDescription
@@ -53,15 +52,15 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
- private val actorToExecutorId = new HashMap[ActorRef, String]
private val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// Periodically revive offers to allow delay scheduling to work
val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
+ import context.dispatcher
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
@@ -73,12 +72,10 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties)
- context.watch(sender)
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
- actorToExecutorId(sender) = executorId
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
makeOffers()
@@ -118,14 +115,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
removeExecutor(executorId, reason)
sender ! true
- case Terminated(actor) =>
- actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
+ case DisassociatedEvent(_, address, _) =>
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
- case RemoteClientDisconnected(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
-
- case RemoteClientShutdown(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
// Make fake resource offers on all executors
@@ -153,7 +145,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId)
- actorToExecutorId -= executorActor(executorId)
addressToExecutorId -= executorAddress(executorId)
executorActor -= executorId
executorHost -= executorId
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index e000531a26..e8fecec4a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -36,7 +36,7 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index cefa970bb9..7127a72d6d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -42,7 +42,7 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
index 2064d97b49..e68c527713 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
@@ -71,7 +71,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
- case ex =>
+ case ex: Throwable =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
}
@@ -95,7 +95,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche
val loader = Thread.currentThread.getContextClassLoader
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
- case ex => {}
+ case ex: Throwable => {}
}
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index cd521e0f2b..84fe3094cc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -120,7 +120,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 702aca8323..19a025a329 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -24,9 +24,9 @@ import scala.collection.mutable.{HashMap, ArrayBuffer}
import scala.util.Random
import akka.actor.{ActorSystem, Cancellable, Props}
-import akka.dispatch.{Await, Future}
-import akka.util.Duration
-import akka.util.duration._
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
@@ -924,4 +924,3 @@ private[spark] object BlockManager extends Logging {
blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 94038649b3..e05b842476 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -17,16 +17,17 @@
package org.apache.spark.storage
-import akka.actor.ActorRef
-import akka.dispatch.{Await, Future}
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import akka.actor._
import akka.pattern.ask
-import akka.util.Duration
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-
-private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
+private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
@@ -156,7 +157,10 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
- val future = driverActor.ask(message)(timeout)
+ val future = driverActor match {
+ case Left(a: ActorRef) => a.ask(message)(timeout)
+ case Right(b: ActorSelection) => b.ask(message)(timeout)
+ }
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index f8cf14b503..154a3980e9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -23,10 +23,10 @@ import scala.collection.mutable
import scala.collection.JavaConversions._
import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.dispatch.Future
import akka.pattern.ask
-import akka.util.Duration
-import akka.util.duration._
+
+import scala.concurrent.duration._
+import scala.concurrent.Future
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
@@ -65,6 +65,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting) {
+ import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 860e680576..a8db37ded1 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -93,7 +93,7 @@ private[spark] object ThreadingTest {
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
val blockManagerMaster = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+ Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index e7eab374ad..c1ee2f3d00 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ui.jobs
-import akka.util.Duration
+import scala.concurrent.duration._
import java.text.SimpleDateFormat
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index 1d633d374a..a5446b3fc3 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ui.storage
-import akka.util.Duration
+import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index d4c5065c3f..74133cef6c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,11 +17,8 @@
package org.apache.spark.util
-import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
-import akka.util.duration._
-import akka.remote.RemoteActorRefProvider
-
/**
* Various utility classes for working with Akka.
@@ -34,39 +31,57 @@ private[spark] object AkkaUtils {
*
* Note: the `name` parameter is important, as even if a client sends a message to right
* host + port, if the system name is incorrect, Akka will drop the message.
+ *
+ * If indestructible is set to true, the Actor System will continue running in the event
+ * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
- def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
- val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
+ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false)
+ : (ActorSystem, Int) = {
+
+ val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
- val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt
+
+ val akkaTimeout = System.getProperty("spark.akka.timeout", "100").toInt
+
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
- val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- // 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
- val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt
-
- val akkaConf = ConfigFactory.parseString("""
- akka.daemonic = on
- akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
- akka.stdout-loglevel = "ERROR"
- akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
- akka.remote.netty.hostname = "%s"
- akka.remote.netty.port = %d
- akka.remote.netty.connection-timeout = %ds
- akka.remote.netty.message-frame-size = %d MiB
- akka.remote.netty.execution-pool-size = %d
- akka.actor.default-dispatcher.throughput = %d
- akka.remote.log-remote-lifecycle-events = %s
- akka.remote.netty.write-timeout = %ds
- """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
- lifecycleEvents, akkaWriteTimeout))
+ val lifecycleEvents =
+ if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+
+ val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "600").toInt
+ val akkaFailureDetector =
+ System.getProperty("spark.akka.failure-detector.threshold", "300.0").toDouble
+ val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "1000").toInt
- val actorSystem = ActorSystem(name, akkaConf)
+ val akkaConf = ConfigFactory.parseString(
+ s"""
+ |akka.daemonic = on
+ |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
+ |akka.stdout-loglevel = "ERROR"
+ |akka.jvm-exit-on-fatal-error = off
+ |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
+ |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+ |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
+ |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
+ |akka.remote.netty.tcp.hostname = "$host"
+ |akka.remote.netty.tcp.port = $port
+ |akka.remote.netty.tcp.tcp-nodelay = on
+ |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
+ |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
+ |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
+ |akka.actor.default-dispatcher.throughput = $akkaBatchSize
+ |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
+ """.stripMargin)
+
+ val actorSystem = if (indestructible) {
+ IndestructibleActorSystem(name, akkaConf)
+ } else {
+ ActorSystem(name, akkaConf)
+ }
- // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
- // hack because Akka doesn't let you figure out the port through the public API yet.
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
- return (actorSystem, boundPort)
+ val boundPort = provider.getDefaultAddress.port.get
+ (actorSystem, boundPort)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
new file mode 100644
index 0000000000..bf71882ef7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Must be in akka.actor package as ActorSystemImpl is protected[akka].
+package akka.actor
+
+import scala.util.control.{ControlThrowable, NonFatal}
+
+import com.typesafe.config.Config
+
+/**
+ * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
+ * This is necessary as Spark Executors are allowed to recover from fatal exceptions
+ * (see [[org.apache.spark.executor.Executor]]).
+ */
+object IndestructibleActorSystem {
+ def apply(name: String, config: Config): ActorSystem =
+ apply(name, config, ActorSystem.findClassLoader())
+
+ def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
+ new IndestructibleActorSystemImpl(name, config, classLoader).start()
+}
+
+private[akka] class IndestructibleActorSystemImpl(
+ override val name: String,
+ applicationConfig: Config,
+ classLoader: ClassLoader)
+ extends ActorSystemImpl(name, applicationConfig, classLoader) {
+
+ protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
+ val fallbackHandler = super.uncaughtExceptionHandler
+
+ new Thread.UncaughtExceptionHandler() {
+ def uncaughtException(thread: Thread, cause: Throwable): Unit = {
+ if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
+ log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
+ "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
+ //shutdown() //TODO make it configurable
+ } else {
+ fallbackHandler.uncaughtException(thread, cause)
+ }
+ }
+ }
+ }
+
+ def isFatalError(e: Throwable): Boolean = {
+ e match {
+ case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
+ false
+ case _ =>
+ true
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 67a7f87a5c..7b41ef89f1 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -55,8 +55,7 @@ class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, clea
}
}
-object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask",
- "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") {
+object MetadataCleanerType extends Enumeration {
val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index 277de2f8a6..dbff571de9 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -85,7 +85,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging {
}
override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
- JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
+ JavaConversions.mapAsScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
}
override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a79e64e810..3f7858d2de 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -22,10 +22,11 @@ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
import scala.io.Source
+import scala.reflect.ClassTag
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -319,7 +320,7 @@ private[spark] object Utils extends Logging {
* result in a new collection. Unlike scala.util.Random.shuffle, this method
* uses a local random number generator, avoiding inter-thread contention.
*/
- def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = {
+ def randomize[T: ClassTag](seq: TraversableOnce[T]): Seq[T] = {
randomizeInPlace(seq.toArray)
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 80545c9688..c26f23d500 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection
+import scala.reflect.ClassTag
/**
* A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
@@ -26,7 +27,7 @@ package org.apache.spark.util.collection
* Under the hood, it uses our OpenHashSet implementation.
*/
private[spark]
-class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
+class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
initialCapacity: Int)
extends Iterable[(K, V)]
with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 40986e3731..87e009a4de 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection
+import scala.reflect._
/**
* A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
@@ -36,7 +37,7 @@ package org.apache.spark.util.collection
* to explore all spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing).
*/
private[spark]
-class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
+class OpenHashSet[@specialized(Long, Int) T: ClassTag](
initialCapacity: Int,
loadFactor: Double)
extends Serializable {
@@ -62,14 +63,14 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
// throws:
// scala.tools.nsc.symtab.Types$TypeError: type mismatch;
// found : scala.reflect.AnyValManifest[Long]
- // required: scala.reflect.ClassManifest[Int]
+ // required: scala.reflect.ClassTag[Int]
// at scala.tools.nsc.typechecker.Contexts$Context.error(Contexts.scala:298)
// at scala.tools.nsc.typechecker.Infer$Inferencer.error(Infer.scala:207)
// ...
- val mt = classManifest[T]
- if (mt == ClassManifest.Long) {
+ val mt = classTag[T]
+ if (mt == ClassTag.Long) {
(new LongHasher).asInstanceOf[Hasher[T]]
- } else if (mt == ClassManifest.Int) {
+ } else if (mt == ClassTag.Int) {
(new IntHasher).asInstanceOf[Hasher[T]]
} else {
new Hasher[T]
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index d76143e45a..2e1ef06cbc 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection
+import scala.reflect._
/**
* A fast hash map implementation for primitive, non-null keys. This hash map supports
@@ -26,15 +27,15 @@ package org.apache.spark.util.collection
* Under the hood, it uses our OpenHashSet implementation.
*/
private[spark]
-class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
- @specialized(Long, Int, Double) V: ClassManifest](
+class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
+ @specialized(Long, Int, Double) V: ClassTag](
initialCapacity: Int)
extends Iterable[(K, V)]
with Serializable {
def this() = this(64)
- require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
+ require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
// Init in constructor (instead of in declaration) to work around a Scala compiler specialization
// bug that would generate two arrays (one for Object and one for specialized T).
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index 20554f0aab..b84eb65c62 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -17,11 +17,13 @@
package org.apache.spark.util.collection
+import scala.reflect.ClassTag
+
/**
* An append-only, non-threadsafe, array-backed vector that is optimized for primitive types.
*/
private[spark]
-class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) {
+class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) {
private var _numElements = 0
private var _array: Array[V] = _
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 4434f3b87c..c443c5266e 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -27,6 +27,21 @@ import org.apache.spark.SparkContext._
class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
+
+ implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
+ def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
+ t1 ++= t2
+ t1
+ }
+ def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
+ t1 += t2
+ t1
+ }
+ def zero(t: mutable.Set[A]) : mutable.Set[A] = {
+ new mutable.HashSet[A]()
+ }
+ }
+
test ("basic accumulation"){
sc = new SparkContext("local", "test")
val acc : Accumulator[Int] = sc.accumulator(0)
@@ -51,7 +66,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
test ("add value to collection accumulators") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -68,22 +82,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
}
- implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
- def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
- t1 ++= t2
- t1
- }
- def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
- t1 += t2
- t1
- }
- def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
- new mutable.HashSet[Any]()
- }
- }
-
test ("value not readable in tasks") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -125,7 +124,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
test ("localValue readable in tasks") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d2226aa5a5..f25d921d3f 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import scala.reflect.ClassTag
import org.scalatest.FunSuite
import java.io.File
import org.apache.spark.rdd._
@@ -205,7 +206,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* not, but this is not done by default as usually the partitions do not refer to any RDD and
* therefore never store the lineage.
*/
- def testCheckpointing[U: ClassManifest](
+ def testCheckpointing[U: ClassTag](
op: (RDD[Int]) => RDD[U],
testRDDSize: Boolean = true,
testRDDPartitionSize: Boolean = false
@@ -274,7 +275,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
* this RDD will remember the partitions and therefore potentially the whole lineage.
*/
- def testParentCheckpointing[U: ClassManifest](
+ def testParentCheckpointing[U: ClassTag](
op: (RDD[Int]) => RDD[U],
testRDDSize: Boolean,
testRDDPartitionSize: Boolean
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 480bac84f3..d9cffb74de 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -303,12 +303,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
Thread.sleep(200)
}
} catch {
- case _ => { Thread.sleep(10) }
+ case _: Throwable => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
}
}
+
}
object DistributedSuite {
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 01a72d8401..6d1695eae7 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -34,7 +34,7 @@ class DriverSuite extends FunSuite with Timeouts {
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
- failAfter(30 seconds) {
+ failAfter(60 seconds) {
Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(System.getenv("SPARK_HOME")))
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index b7eb268bd5..271dc905bc 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -49,14 +49,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -75,7 +75,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -101,13 +101,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
val masterTracker = new MapOutputTrackerMaster()
- masterTracker.trackerActor = actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
+ masterTracker.trackerActor = Left(actorSystem.actorOf(
+ Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker()
- slaveTracker.trackerActor = slaveSystem.actorFor(
- "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
+ slaveTracker.trackerActor = Right(slaveSystem.actorSelection(
+ "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker"))
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
index 46a2da1724..768ca3850e 100644
--- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
@@ -37,7 +37,7 @@ class UnpersistSuite extends FunSuite with LocalSparkContext {
Thread.sleep(200)
}
} catch {
- case _ => { Thread.sleep(10) }
+ case _: Throwable => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 354ab8ae5d..d8dcd6d14c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -244,8 +244,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// test that you get over 90% locality in each group
val minLocality = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.)((perc, loc) => math.min(perc,loc))
- assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
+ .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
+ assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%")
// test that the groups are load balanced with 100 +/- 20 elements in each
val maxImbalance = coalesced2.partitions
@@ -257,9 +257,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced3 = data3.coalesce(numMachines*2)
val minLocality2 = coalesced3.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.)((perc, loc) => math.min(perc,loc))
+ .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
- (minLocality2*100.).toInt + "%")
+ (minLocality2*100.0).toInt + "%")
}
test("zipped RDDs") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 1fd76420ea..2e41438a52 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -145,7 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
// Make a task whose result is larger than the akka frame size
System.setProperty("spark.akka.frameSize", "1")
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
index ee150a3107..27c2d53361 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
@@ -82,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
test("handling results larger than Akka frame size") {
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
@@ -103,7 +103,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index cb76275e39..b647e8a672 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -39,7 +39,7 @@ class BlockIdSuite extends FunSuite {
fail()
} catch {
case e: IllegalStateException => // OK
- case _ => fail()
+ case _: Throwable => fail()
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 484a654108..5b4d63b954 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -56,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
System.setProperty("spark.hostPort", "localhost:" + boundPort)
master = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+ Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 8f0ec6683b..3764f4d1a0 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -34,7 +34,6 @@ class UISuite extends FunSuite {
}
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
-
// Allow some wiggle room in case ports on the machine are under contention
assert(boundPort1 > startPort && boundPort1 < startPort + 10)
assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 4e40dcbdee..5aff26f9fc 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -63,54 +63,53 @@ class SizeEstimatorSuite
}
test("simple classes") {
- assert(SizeEstimator.estimate(new DummyClass1) === 16)
- assert(SizeEstimator.estimate(new DummyClass2) === 16)
- assert(SizeEstimator.estimate(new DummyClass3) === 24)
- assert(SizeEstimator.estimate(new DummyClass4(null)) === 24)
- assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48)
+ expectResult(16)(SizeEstimator.estimate(new DummyClass1))
+ expectResult(16)(SizeEstimator.estimate(new DummyClass2))
+ expectResult(24)(SizeEstimator.estimate(new DummyClass3))
+ expectResult(24)(SizeEstimator.estimate(new DummyClass4(null)))
+ expectResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}
// NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("strings") {
- assert(SizeEstimator.estimate(DummyString("")) === 40)
- assert(SizeEstimator.estimate(DummyString("a")) === 48)
- assert(SizeEstimator.estimate(DummyString("ab")) === 48)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
+ expectResult(40)(SizeEstimator.estimate(DummyString("")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
}
test("primitive arrays") {
- assert(SizeEstimator.estimate(new Array[Byte](10)) === 32)
- assert(SizeEstimator.estimate(new Array[Char](10)) === 40)
- assert(SizeEstimator.estimate(new Array[Short](10)) === 40)
- assert(SizeEstimator.estimate(new Array[Int](10)) === 56)
- assert(SizeEstimator.estimate(new Array[Long](10)) === 96)
- assert(SizeEstimator.estimate(new Array[Float](10)) === 56)
- assert(SizeEstimator.estimate(new Array[Double](10)) === 96)
- assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016)
- assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016)
+ expectResult(32)(SizeEstimator.estimate(new Array[Byte](10)))
+ expectResult(40)(SizeEstimator.estimate(new Array[Char](10)))
+ expectResult(40)(SizeEstimator.estimate(new Array[Short](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[Int](10)))
+ expectResult(96)(SizeEstimator.estimate(new Array[Long](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[Float](10)))
+ expectResult(96)(SizeEstimator.estimate(new Array[Double](10)))
+ expectResult(4016)(SizeEstimator.estimate(new Array[Int](1000)))
+ expectResult(8016)(SizeEstimator.estimate(new Array[Long](1000)))
}
test("object arrays") {
// Arrays containing nulls should just have one pointer per element
- assert(SizeEstimator.estimate(new Array[String](10)) === 56)
- assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56)
-
+ expectResult(56)(SizeEstimator.estimate(new Array[String](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
// For object arrays with non-null elements, each object should take one pointer plus
// however many bytes that class takes. (Note that Array.fill calls the code in its
// second parameter separately for each object, so we get distinct objects.)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296)
- assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56)
+ expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
+ expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
+ expectResult(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
+ expectResult(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
// Past size 100, our samples 100 elements, but we should still get the right size.
- assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016)
+ expectResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
- assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object
- assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object
+ expectResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
+ expectResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
// Same thing with huge array containing the same element many times. Note that this won't
// return exactly 4032 because it can't tell that *all* the elements will equal the first
@@ -128,11 +127,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- assert(SizeEstimator.estimate(DummyString("")) === 40)
- assert(SizeEstimator.estimate(DummyString("a")) === 48)
- assert(SizeEstimator.estimate(DummyString("ab")) === 48)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
-
+ expectResult(40)(SizeEstimator.estimate(DummyString("")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
}
@@ -145,10 +143,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- assert(SizeEstimator.estimate(DummyString("")) === 56)
- assert(SizeEstimator.estimate(DummyString("a")) === 64)
- assert(SizeEstimator.estimate(DummyString("ab")) === 64)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72)
+ expectResult(56)(SizeEstimator.estimate(DummyString("")))
+ expectResult(64)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(64)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)