aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-19 11:20:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-19 11:20:48 -0800
commit984c5824876e0daceb8a74af57593926faf727ce (patch)
tree48be0648167683de8dd064b17513c81956bbffb5 /core
parent5e9ce83d682d6198cda4631faf11cb53fcccf07f (diff)
parentec71b445ad0440e84c4b4909e4faf75aba0f13d7 (diff)
downloadspark-984c5824876e0daceb8a74af57593926faf727ce.tar.gz
spark-984c5824876e0daceb8a74af57593926faf727ce.tar.bz2
spark-984c5824876e0daceb8a74af57593926faf727ce.zip
Merge branch 'scheduler-update' into filestream-fix
Conflicts: core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml35
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClient.java2
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServer.java1
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala77
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/TaskState.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala77
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala60
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java1
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function.java8
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function2.java8
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function3.java8
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java12
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java12
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala69
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala64
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala79
-rw-r--r--core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/UnpersistSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala72
111 files changed, 740 insertions, 632 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 38f4be1280..043f6cf68d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Core</name>
<url>http://spark.incubator.apache.org/</url>
@@ -81,12 +81,8 @@
<artifactId>asm</artifactId>
</dependency>
<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- <dependency>
<groupId>com.twitter</groupId>
- <artifactId>chill_2.9.3</artifactId>
+ <artifactId>chill_${scala.binary.version}</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
@@ -96,19 +92,11 @@
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
- <artifactId>akka-actor</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-remote</artifactId>
+ <artifactId>akka-remote_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
- <artifactId>akka-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scalap</artifactId>
+ <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
@@ -116,7 +104,7 @@
</dependency>
<dependency>
<groupId>net.liftweb</groupId>
- <artifactId>lift-json_2.9.2</artifactId>
+ <artifactId>lift-json_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
@@ -164,13 +152,18 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_2.9.3</artifactId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -190,8 +183,8 @@
</dependency>
</dependencies>
<build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 20a7a3aa8c..edd0fc56f8 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -19,8 +19,6 @@ package org.apache.spark.network.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index 666432474d..a99af348ce 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -20,7 +20,6 @@ package org.apache.spark.network.netty;
import java.net.InetSocketAddress;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index b4d0b7017c..10fae5af9f 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -21,12 +21,11 @@ import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.HashSet
+import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
-import akka.dispatch._
import akka.pattern.ask
-import akka.util.Duration
-
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
@@ -55,9 +54,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
+
// Set to the MapOutputTrackerActor living on the driver
- var trackerActor: ActorRef = _
+ var trackerActor: Either[ActorRef, ActorSelection] = _
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
@@ -73,8 +72,18 @@ private[spark] class MapOutputTracker extends Logging {
// throw a SparkException if this fails.
private def askTracker(message: Any): Any = {
try {
- val future = trackerActor.ask(message)(timeout)
- return Await.result(future, timeout)
+ /*
+ The difference between ActorRef and ActorSelection is well explained here:
+ http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor
+ In spark a map output tracker can be either started on Driver where it is created which
+ is an ActorRef or it can be on executor from where it is looked up which is an
+ actorSelection.
+ */
+ val future = trackerActor match {
+ case Left(a: ActorRef) => a.ask(message)(timeout)
+ case Right(b: ActorSelection) => b.ask(message)(timeout)
+ }
+ Await.result(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error communicating with MapOutputTracker", e)
@@ -117,7 +126,7 @@ private[spark] class MapOutputTracker extends Logging {
fetching += shuffleId
}
}
-
+
if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
@@ -144,7 +153,7 @@ private[spark] class MapOutputTracker extends Logging {
else{
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
- }
+ }
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
@@ -312,7 +321,7 @@ private[spark] object MapOutputTracker {
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
assert (statuses != null)
statuses.map {
- status =>
+ status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 0e2c987a59..bcec41c439 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -17,8 +17,10 @@
package org.apache.spark
-import org.apache.spark.util.Utils
+import scala.reflect.ClassTag
+
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
@@ -72,7 +74,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
-
+
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
@@ -85,7 +87,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
* Determines the ranges by sampling the RDD passed in.
*/
-class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
+class RangePartitioner[K <% Ordered[K]: ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1811bfa1e5..c30f896cf1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -26,6 +26,7 @@ import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -267,19 +268,19 @@ class SparkContext(
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */
- def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
+ def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
/** Distribute a local Scala collection to form an RDD. */
- def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
+ def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}
/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
- def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
+ def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
}
@@ -332,7 +333,7 @@ class SparkContext(
}
/**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
+ * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
@@ -340,17 +341,17 @@ class SparkContext(
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
: RDD[(K, V)] = {
hadoopFile(path,
- fm.erasure.asInstanceOf[Class[F]],
- km.erasure.asInstanceOf[Class[K]],
- vm.erasure.asInstanceOf[Class[V]],
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
minSplits)
}
/**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
+ * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
@@ -358,17 +359,17 @@ class SparkContext(
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] =
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = {
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
path,
- fm.erasure.asInstanceOf[Class[F]],
- km.erasure.asInstanceOf[Class[K]],
- vm.erasure.asInstanceOf[Class[V]])
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]])
}
/**
@@ -426,11 +427,11 @@ class SparkContext(
* IntWritable). The most natural thing would've been to have implicit objects for the
* converters, but then we couldn't have an object for every subclass of Writable (you can't
* have a parameterized singleton object). We use functions instead to create a new converter
- * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
+ * for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
- (implicit km: ClassManifest[K], vm: ClassManifest[V],
+ (implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
@@ -449,7 +450,7 @@ class SparkContext(
* slow if you use the default serializer (Java serialization), though the nice thing about it is
* that there's very little effort required to save arbitrary objects.
*/
- def objectFile[T: ClassManifest](
+ def objectFile[T: ClassTag](
path: String,
minSplits: Int = defaultMinSplits
): RDD[T] = {
@@ -458,17 +459,17 @@ class SparkContext(
}
- protected[spark] def checkpointFile[T: ClassManifest](
+ protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
new CheckpointRDD[T](this, path)
}
/** Build the union of a list of RDDs. */
- def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
+ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
/** Build the union of a list of RDDs passed as variable-length arguments. */
- def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
+ def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)
// Methods for creating shared variables
@@ -711,7 +712,7 @@ class SparkContext(
* flag specifies whether the scheduler can run the computation on the driver rather than
* shipping it out to the cluster, for short actions like first().
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
@@ -732,7 +733,7 @@ class SparkContext(
* allowLocal flag specifies whether the scheduler can run the computation on the driver rather
* than shipping it out to the cluster, for short actions like first().
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
@@ -747,7 +748,7 @@ class SparkContext(
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
@@ -759,21 +760,21 @@ class SparkContext(
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
- def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
+ def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
- def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
+ def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit)
@@ -784,7 +785,7 @@ class SparkContext(
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
*/
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: Iterator[T] => U,
resultHandler: (Int, U) => Unit)
@@ -923,16 +924,16 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
- implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
+ implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
- implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new AsyncRDDActions(rdd)
+ implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
- implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
+ implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
- implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
+ implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)
@@ -957,16 +958,16 @@ object SparkContext {
implicit def stringToText(s: String) = new Text(s)
- private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
+ private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
- new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
+ new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
}
// Helper objects for converting common types to Writable
- private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = {
- val wClass = classManifest[W].erasure.asInstanceOf[Class[W]]
+ private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = {
+ val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}
@@ -985,7 +986,7 @@ object SparkContext {
implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
implicit def writableWritableConverter[T <: Writable]() =
- new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T])
+ new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
@@ -1140,12 +1141,12 @@ object SparkContext {
/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
- * The getter for the writable class takes a ClassManifest[T] in case this is a generic object
+ * The getter for the writable class takes a ClassTag[T] in case this is a generic object
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
private[spark] class WritableConverter[T](
- val writableClass: ClassManifest[T] => Class[_ <: Writable],
+ val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ff2df8fb6a..826f5c2d8c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -20,7 +20,7 @@ package org.apache.spark
import collection.mutable
import serializer.Serializer
-import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
+import akka.actor._
import akka.remote.RemoteActorRefProvider
import org.apache.spark.broadcast.BroadcastManager
@@ -74,7 +74,8 @@ class SparkEnv (
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
- actorSystem.awaitTermination()
+ // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
+ //actorSystem.awaitTermination()
}
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
@@ -151,17 +152,17 @@ object SparkEnv extends Logging {
val closureSerializer = serializerManager.get(
System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
- def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
+ def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
if (isDriver) {
logInfo("Registering " + name)
- actorSystem.actorOf(Props(newActor), name = name)
+ Left(actorSystem.actorOf(Props(newActor), name = name))
} else {
val driverHost: String = System.getProperty("spark.driver.host", "localhost")
val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
- val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
+ val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
- actorSystem.actorFor(url)
+ Right(actorSystem.actorSelection(url))
}
}
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index 19ce8369d9..0bf1e4a5e2 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -19,8 +19,7 @@ package org.apache.spark
import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-private[spark] object TaskState
- extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED", "KILLED", "LOST") {
+private[spark] object TaskState extends Enumeration {
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 9f02a9b7d3..da30cf619a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -17,12 +17,15 @@
package org.apache.spark.api.java
+import scala.reflect.ClassTag
+
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.util.StatCounter
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.storage.StorageLevel
+
import java.lang.Double
import org.apache.spark.Partitioner
@@ -30,7 +33,7 @@ import scala.collection.JavaConverters._
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
- override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]]
+ override val classTag: ClassTag[Double] = implicitly[ClassTag[Double]]
override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
@@ -44,7 +47,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
- /**
+ /**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@@ -108,7 +111,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 2142fd7327..363667fa86 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -22,6 +22,7 @@ import java.util.Comparator
import scala.Tuple2
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
@@ -43,13 +44,13 @@ import org.apache.spark.rdd.OrderedRDDFunctions
import org.apache.spark.storage.StorageLevel
-class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
- implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K],
+ implicit val vClassTag: ClassTag[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
- override val classManifest: ClassManifest[(K, V)] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ override val classTag: ClassTag[(K, V)] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
import JavaPairRDD._
@@ -58,7 +59,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
- /**
+ /**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@@ -138,14 +139,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
override def first(): (K, V) = rdd.first()
// Pair RDD functions
-
+
/**
- * Generic function to combine the elements for each key using a custom set of aggregation
- * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
- * "combined type" C * Note that V and C can be different -- for example, one might group an
- * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
+ * Generic function to combine the elements for each key using a custom set of aggregation
+ * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
+ * "combined type" C * Note that V and C can be different -- for example, one might group an
+ * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
* functions:
- *
+ *
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
@@ -157,8 +158,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
- implicit val cm: ClassManifest[C] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
fromRDD(rdd.combineByKey(
createCombiner,
mergeValue,
@@ -195,14 +195,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())
- /**
+ /**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
- /**
+ /**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@@ -258,7 +258,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
@@ -315,15 +315,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
- /**
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
- implicit val cm: ClassManifest[C] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
}
@@ -414,8 +413,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
fromRDD(rdd.mapValues(f))
}
@@ -426,8 +424,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
fromRDD(rdd.flatMapValues(fn))
}
@@ -592,6 +589,20 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
}
/**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+ * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+ * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+ * order of the keys).
+ */
+ def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
+ class KeyOrdering(val a: K) extends Ordered[K] {
+ override def compare(b: K) = comp.compare(a, b)
+ }
+ implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
+ fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
+ }
+
+ /**
* Return an RDD with the keys of each tuple.
*/
def keys(): JavaRDD[K] = JavaRDD.fromRDD[K](rdd.map(_._1))
@@ -603,22 +614,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
}
object JavaPairRDD {
- def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassManifest[K],
- vcm: ClassManifest[T]): RDD[(K, JList[T])] =
+ def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassTag[K],
+ vcm: ClassTag[T]): RDD[(K, JList[T])] =
rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _)
- def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassManifest[K],
- vcm: ClassManifest[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd).mapValues((x: (Seq[V],
- Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
+ def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassTag[K],
+ vcm: ClassTag[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd)
+ .mapValues((x: (Seq[V], Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1],
- Seq[W2]))])(implicit kcm: ClassManifest[K]) : RDD[(K, (JList[V], JList[W1],
+ Seq[W2]))])(implicit kcm: ClassTag[K]) : RDD[(K, (JList[V], JList[W1],
JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues(
(x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1),
seqAsJavaList(x._2),
seqAsJavaList(x._3)))
- def fromRDD[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
+ def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd)
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
@@ -626,10 +637,8 @@ object JavaPairRDD {
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmk: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val cmv: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
new JavaPairRDD[K, V](rdd.rdd)
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 3b359a8fd6..c47657f512 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -17,12 +17,14 @@
package org.apache.spark.api.java
+import scala.reflect.ClassTag
+
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
-class JavaRDD[T](val rdd: RDD[T])(implicit val classManifest: ClassManifest[T]) extends
+class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
JavaRDDLike[T, JavaRDD[T]] {
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
@@ -127,8 +129,7 @@ JavaRDDLike[T, JavaRDD[T]] {
object JavaRDD {
- implicit def fromRDD[T: ClassManifest](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
+ implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}
-
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 7a3568c5ef..9e912d3adb 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -20,6 +20,7 @@ package org.apache.spark.api.java
import java.util.{List => JList, Comparator}
import scala.Tuple2
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
@@ -35,7 +36,7 @@ import org.apache.spark.storage.StorageLevel
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
- implicit val classManifest: ClassManifest[T]
+ implicit val classTag: ClassTag[T]
def rdd: RDD[T]
@@ -71,7 +72,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- def mapPartitionsWithIndex[R: ClassManifest](
+ def mapPartitionsWithIndex[R: ClassTag](
f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
@@ -87,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -118,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -158,18 +159,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* elements (a, b) where a is in `this` and b is in `other`.
*/
def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] =
- JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest,
- other.classManifest)
+ JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classTag))(classTag, other.classTag)
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
- implicit val kcm: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val vcm: ClassManifest[JList[T]] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
+ implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val vcm: ClassTag[JList[T]] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
}
@@ -178,10 +177,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
- implicit val kcm: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val vcm: ClassManifest[JList[T]] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
+ implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val vcm: ClassTag[JList[T]] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm)
}
@@ -209,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* a map on the other).
*/
def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
- JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest)
+ JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, other.classTag)
}
/**
@@ -224,7 +222,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
- rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType())
+ rdd.zipPartitions(other.rdd)(fn)(other.classTag, f.elementType()))(f.elementType())
}
// Actions (launch a job to return a value to the user program)
@@ -356,7 +354,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
- implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
JavaPairRDD.fromRDD(rdd.keyBy(f))
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index c63db4970b..50f2021d01 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -21,6 +21,7 @@ import java.util.{Map => JMap}
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
@@ -82,8 +83,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
}
@@ -94,10 +94,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Distribute a local Scala collection to form an RDD. */
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
: JavaPairRDD[K, V] = {
- implicit val kcm: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val vcm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val vcm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
}
@@ -132,16 +130,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
}
/**Get an RDD for a Hadoop SequenceFile. */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
}
@@ -153,8 +151,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.objectFile(path, minSplits)(cm)
}
@@ -166,8 +163,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.objectFile(path)(cm)
}
@@ -183,8 +179,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
}
@@ -199,8 +195,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
}
@@ -212,8 +208,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
}
@@ -224,8 +220,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path,
inputFormatClass, keyClass, valueClass))
}
@@ -240,8 +236,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
kClass: Class[K],
vClass: Class[V],
conf: Configuration): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(kClass)
- implicit val vcm = ClassManifest.fromClass(vClass)
+ implicit val kcm: ClassTag[K] = ClassTag(kClass)
+ implicit val vcm: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
}
@@ -254,15 +250,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(kClass)
- implicit val vcm = ClassManifest.fromClass(vClass)
+ implicit val kcm: ClassTag[K] = ClassTag(kClass)
+ implicit val vcm: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
/** Build the union of two or more RDDs. */
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
- implicit val cm: ClassManifest[T] = first.classManifest
+ implicit val cm: ClassTag[T] = first.classTag
sc.union(rdds)(cm)
}
@@ -270,9 +266,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
: JavaPairRDD[K, V] = {
val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
- implicit val cm: ClassManifest[(K, V)] = first.classManifest
- implicit val kcm: ClassManifest[K] = first.kManifest
- implicit val vcm: ClassManifest[V] = first.vManifest
+ implicit val cm: ClassTag[(K, V)] = first.classTag
+ implicit val kcm: ClassTag[K] = first.kClassTag
+ implicit val vcm: ClassTag[V] = first.vClassTag
new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
}
@@ -392,8 +388,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
}
protected def checkpointFile[T](path: String): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new JavaRDD(sc.checkpointFile(path))
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
index c9cbce5624..2090efd3b9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
@@ -17,7 +17,6 @@
package org.apache.spark.api.java;
-import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
index 2dfda8b09a..bdb01f7670 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
@@ -17,9 +17,11 @@
package org.apache.spark.api.java.function
+import scala.reflect.ClassTag
+
/**
* A function that returns zero or more output records from each input record.
*/
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
- def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
+ def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
index 528e1c0a7c..aae1349c5e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
@@ -17,9 +17,11 @@
package org.apache.spark.api.java.function
+import scala.reflect.ClassTag
+
/**
* A function that takes two inputs and returns zero or more output records.
*/
abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
- def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]]
+ def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
index ce368ee01b..537439ef53 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
@@ -17,8 +17,8 @@
package org.apache.spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import java.io.Serializable;
@@ -29,8 +29,8 @@ import java.io.Serializable;
* when mapping RDDs of other types.
*/
public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
index 44ad559d48..a2d1214fb4 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
@@ -17,8 +17,8 @@
package org.apache.spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import java.io.Serializable;
@@ -28,8 +28,8 @@ import java.io.Serializable;
public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
implements Serializable {
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
index ac6178924a..fb1deceab5 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
@@ -17,8 +17,8 @@
package org.apache.spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction2;
import java.io.Serializable;
@@ -29,8 +29,8 @@ import java.io.Serializable;
public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
implements Serializable {
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 6d76a8f970..ca485b3cc2 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -18,8 +18,8 @@
package org.apache.spark.api.java.function;
import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import java.io.Serializable;
@@ -33,11 +33,11 @@ public abstract class PairFlatMapFunction<T, K, V>
extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
implements Serializable {
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<K> keyType() {
+ return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
}
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<V> valueType() {
+ return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
index ede7ceefb5..cbe2306026 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
@@ -18,8 +18,8 @@
package org.apache.spark.api.java.function;
import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import java.io.Serializable;
@@ -31,11 +31,11 @@ import java.io.Serializable;
public abstract class PairFunction<T, K, V> extends WrappedFunction1<T, Tuple2<K, V>>
implements Serializable {
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<K> keyType() {
+ return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
}
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<V> valueType() {
+ return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 132e4fb0d2..a659cc06c2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -22,6 +22,7 @@ import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
@@ -29,8 +30,7 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
-
-private[spark] class PythonRDD[T: ClassManifest](
+private[spark] class PythonRDD[T: ClassTag](
parent: RDD[T],
command: Array[Byte],
envVars: JMap[String, String],
@@ -148,7 +148,7 @@ private[spark] class PythonRDD[T: ClassManifest](
case eof: EOFException => {
throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
}
- case e => throw e
+ case e: Throwable => throw e
}
}
@@ -200,7 +200,7 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
- case e => throw e
+ case e: Throwable => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
@@ -236,7 +236,7 @@ private[spark] object PythonRDD {
}
def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
- implicit val cm : ClassManifest[T] = rdd.elementClassManifest
+ implicit val cm : ClassTag[T] = rdd.elementClassTag
rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 67d45723ba..f291266fcf 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -64,7 +64,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
startDaemon()
new Socket(daemonHost, daemonPort)
}
- case e => throw e
+ case e: Throwable => throw e
}
}
}
@@ -198,7 +198,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}.start()
} catch {
- case e => {
+ case e: Throwable => {
stopDaemon()
throw e
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index fcfea96ad6..37dfa7fec0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy
-private[spark] object ExecutorState
- extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
+private[spark] object ExecutorState extends Enumeration {
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index a724900943..59d12a3e6f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -34,11 +34,11 @@ import scala.collection.mutable.ArrayBuffer
*/
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
-
+
private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
-
+
def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
@@ -61,10 +61,13 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
def stop() {
logInfo("Shutting down local Spark cluster.")
// Stop the workers before the master so they don't get upset that it disconnected
+ // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
+ // This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
- workerActorSystems.foreach(_.awaitTermination())
-
+ //workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
- masterActorSystems.foreach(_.awaitTermination())
+ //masterActorSystems.foreach(_.awaitTermination())
+ masterActorSystems.clear()
+ workerActorSystems.clear()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 77422f61ec..4d95efa73a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -19,17 +19,15 @@ package org.apache.spark.deploy.client
import java.util.concurrent.TimeoutException
+import scala.concurrent.duration._
+import scala.concurrent.Await
+
import akka.actor._
-import akka.actor.Terminated
+import akka.pattern.AskTimeoutException
import akka.pattern.ask
-import akka.util.Duration
-import akka.util.duration._
-import akka.remote.RemoteClientDisconnected
-import akka.remote.RemoteClientLifeCycleEvent
-import akka.remote.RemoteClientShutdown
-import akka.dispatch.Await
-
-import org.apache.spark.Logging
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
+
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -51,18 +49,19 @@ private[spark] class Client(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
+ var masterAddress: Address = null
var actor: ActorRef = null
var appId: String = null
var registered = false
var activeMasterUrl: String = null
class ClientActor extends Actor with Logging {
- var master: ActorRef = null
- var masterAddress: Address = null
+ var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
override def preStart() {
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try {
registerWithMaster()
} catch {
@@ -76,7 +75,7 @@ private[spark] class Client(
def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
- val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+ val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterApplication(appDescription)
}
}
@@ -84,6 +83,7 @@ private[spark] class Client(
def registerWithMaster() {
tryRegisterAllMasters()
+ import context.dispatcher
var retries = 0
lazy val retryTimer: Cancellable =
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
@@ -102,10 +102,13 @@ private[spark] class Client(
def changeMaster(url: String) {
activeMasterUrl = url
- master = context.actorFor(Master.toAkkaUrl(url))
- masterAddress = master.path.address
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ masterAddress = activeMasterUrl match {
+ case Master.sparkUrlRegex(host, port) =>
+ Address("akka.tcp", Master.systemName, host, port.toInt)
+ case x =>
+ throw new SparkException("Invalid spark URL: " + x)
+ }
}
override def receive = {
@@ -135,21 +138,12 @@ private[spark] class Client(
case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
- context.unwatch(master)
changeMaster(masterUrl)
alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId)
- case Terminated(actor_) if actor_ == master =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
- markDisconnected()
-
- case RemoteClientDisconnected(transport, address) if address == masterAddress =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
- markDisconnected()
-
- case RemoteClientShutdown(transport, address) if address == masterAddress =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
+ case DisassociatedEvent(_, address, _) if address == masterAddress =>
+ logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
case StopClient =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
index fedf879eff..67e6c5d66a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object ApplicationState
- extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED", "UNKNOWN") {
+private[spark] object ApplicationState extends Enumeration {
type ApplicationState = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index c0849ef324..043945a211 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -65,7 +65,7 @@ private[spark] class FileSystemPersistenceEngine(
(apps, workers)
}
- private def serializeIntoFile(file: File, value: Serializable) {
+ private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
@@ -77,13 +77,13 @@ private[spark] class FileSystemPersistenceEngine(
out.close()
}
- def deserializeFromFile[T <: Serializable](file: File)(implicit m: Manifest[T]): T = {
+ def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
dis.readFully(fileData)
dis.close()
- val clazz = m.erasure.asInstanceOf[Class[T]]
+ val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index cd916672ac..c627dd3806 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,19 +17,20 @@
package org.apache.spark.deploy.master
-import java.util.Date
import java.text.SimpleDateFormat
+import java.util.concurrent.TimeUnit
+import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
-import akka.actor.Terminated
-import akka.dispatch.Await
import akka.pattern.ask
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.remote._
import akka.serialization.SerializationExtension
-import akka.util.duration._
-import akka.util.{Duration, Timeout}
+import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -37,9 +38,11 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{Utils, AkkaUtils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
+ import context.dispatcher
+
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
@@ -93,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@@ -113,13 +116,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
new BlackHolePersistenceEngine()
}
- leaderElectionAgent = context.actorOf(Props(
- RECOVERY_MODE match {
+ leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
- new ZooKeeperLeaderElectionAgent(self, masterUrl)
+ context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
case _ =>
- new MonarchyLeaderAgent(self)
- }))
+ context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
+ }
}
override def preRestart(reason: Throwable, message: Option[Any]) {
@@ -142,9 +144,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
RecoveryState.ALIVE
else
RecoveryState.RECOVERING
-
logInfo("I have been elected leader! New state: " + state)
-
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
@@ -156,7 +156,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
System.exit(0)
}
- case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
+ case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -164,9 +164,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+ val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
+ sender, workerWebUiPort, publicAddress)
registerWorker(worker)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
@@ -181,7 +181,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
@@ -257,23 +256,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (canCompleteRecovery) { completeRecovery() }
}
- case Terminated(actor) => {
- // The disconnected actor could've been either a worker or an app; remove whichever of
- // those we have an entry for in the corresponding actor hashmap
- actorToWorker.get(actor).foreach(removeWorker)
- actorToApp.get(actor).foreach(finishApplication)
- if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
- }
-
- case RemoteClientDisconnected(transport, address) => {
- // The disconnected client could've been either a worker or an app; remove whichever it was
- addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(finishApplication)
- if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
- }
-
- case RemoteClientShutdown(transport, address) => {
+ case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
+ logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
@@ -530,9 +515,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
private[spark] object Master {
- private val systemName = "sparkMaster"
+ val systemName = "sparkMaster"
private val actorName = "Master"
- private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+ val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
@@ -540,11 +525,11 @@ private[spark] object Master {
actorSystem.awaitTermination()
}
- /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+ /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
def toAkkaUrl(sparkUrl: String): String = {
sparkUrl match {
case sparkUrlRegex(host, port) =>
- "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
case _ =>
throw new SparkException("Invalid master URL: " + sparkUrl)
}
@@ -552,9 +537,9 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
- val timeoutDuration = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
+ val timeoutDuration: FiniteDuration = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
implicit val timeout = Timeout(timeoutDuration)
val respFuture = actor ? RequestWebUIPort // ask pattern
val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
index b91be821f0..256a5a7c28 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
@@ -17,9 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object RecoveryState
- extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
-
+private[spark] object RecoveryState extends Enumeration {
type MasterState = Value
val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
index c8d34f25e2..0b36ef6005 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@ -17,9 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object WorkerState
- extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED", "UNKNOWN") {
-
+private[spark] object WorkerState extends Enumeration {
type WorkerState = Value
val ALIVE, DEAD, DECOMMISSIONED, UNKNOWN = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index a0233a7271..825344b3bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -70,15 +70,15 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
(apps, workers)
}
- private def serializeIntoFile(path: String, value: Serializable) {
+ private def serializeIntoFile(path: String, value: AnyRef) {
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
zk.create(path, serialized, CreateMode.PERSISTENT)
}
- def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = {
+ def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
val fileData = zk.getData("/spark/master_status/" + filename)
- val clazz = m.erasure.asInstanceOf[Class[T]]
+ val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index f4e574d15d..3b983c19eb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -19,9 +19,10 @@ package org.apache.spark.deploy.master.ui
import scala.xml.Node
-import akka.dispatch.Await
import akka.pattern.ask
-import akka.util.duration._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index d7a57229b0..65e7a14e7a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import akka.dispatch.Await
+import scala.concurrent.Await
import akka.pattern.ask
-import akka.util.duration._
+import scala.concurrent.duration._
import net.liftweb.json.JsonAST.JValue
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index f4df729e87..a211ce2b42 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.master.ui
-import akka.util.Duration
+import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 216d9d44ac..87531b6719 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -17,23 +17,31 @@
package org.apache.spark.deploy.worker
+import java.io.File
import java.text.SimpleDateFormat
import java.util.Date
-import java.io.File
import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
import akka.actor._
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import akka.util.duration._
+import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
+import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
+import org.apache.spark.deploy.DeployMessages.KillExecutor
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.deploy.DeployMessages.Heartbeat
+import org.apache.spark.deploy.DeployMessages.RegisteredWorker
+import org.apache.spark.deploy.DeployMessages.LaunchExecutor
+import org.apache.spark.deploy.DeployMessages.RegisterWorker
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -47,6 +55,7 @@ private[spark] class Worker(
masterUrls: Array[String],
workDirPath: String = null)
extends Actor with Logging {
+ import context.dispatcher
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
@@ -63,7 +72,8 @@ private[spark] class Worker(
var masterIndex = 0
val masterLock: Object = new Object()
- var master: ActorRef = null
+ var master: ActorSelection = null
+ var masterAddress: Address = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
@volatile var registered = false
@@ -114,7 +124,7 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
-
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
registerWithMaster()
@@ -126,9 +136,13 @@ private[spark] class Worker(
masterLock.synchronized {
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
- master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ masterAddress = activeMasterUrl match {
+ case Master.sparkUrlRegex(_host, _port) =>
+ Address("akka.tcp", Master.systemName, _host, _port.toInt)
+ case x =>
+ throw new SparkException("Invalid spark URL: " + x)
+ }
connected = true
}
}
@@ -136,7 +150,7 @@ private[spark] class Worker(
def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
- val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+ val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get,
publicAddress)
}
@@ -175,7 +189,6 @@ private[spark] class Worker(
case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
- context.unwatch(master)
changeMaster(masterUrl, masterWebUiUrl)
val execs = executors.values.
@@ -234,13 +247,8 @@ private[spark] class Worker(
}
}
- case Terminated(actor_) if actor_ == master =>
- masterDisconnected()
-
- case RemoteClientDisconnected(transport, address) if address == master.path.address =>
- masterDisconnected()
-
- case RemoteClientShutdown(transport, address) if address == master.path.address =>
+ case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
+ logInfo(s"$x Disassociated !")
masterDisconnected()
case RequestWorkerState => {
@@ -280,8 +288,8 @@ private[spark] object Worker {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir)), name = "Worker")
+ actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
+ masterUrls, workDir), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index d2d3617498..1a768d501f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -21,9 +21,10 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import akka.dispatch.Await
+import scala.concurrent.duration._
+import scala.concurrent.Await
+
import akka.pattern.ask
-import akka.util.duration._
import net.liftweb.json.JsonAST.JValue
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 800f1cafcc..6c18a3c245 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -17,20 +17,19 @@
package org.apache.spark.deploy.worker.ui
-import akka.util.{Duration, Timeout}
+import java.io.File
-import java.io.{FileInputStream, File}
+import scala.concurrent.duration._
+import akka.util.Timeout
import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.{Handler, Server}
-
+import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.{Logging}
-import org.apache.spark.ui.JettyUtils
+import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
+import org.eclipse.jetty.server.{Handler, Server}
/**
* Web UI server for the standalone worker.
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8332631838..debbdd4c44 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -19,15 +19,14 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
-import akka.actor.{ActorRef, Actor, Props, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.actor._
+import akka.remote._
import org.apache.spark.Logging
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
-
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
@@ -40,14 +39,13 @@ private[spark] class CoarseGrainedExecutorBackend(
Utils.checkHostPort(hostPort, "Expected hostport")
var executor: Executor = null
- var driver: ActorRef = null
+ var driver: ActorSelection = null
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
+ driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receive = {
@@ -77,8 +75,8 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.killTask(taskId)
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
- logError("Driver terminated or disconnected! Shutting down.")
+ case x: DisassociatedEvent =>
+ logError(s"Driver $x disassociated! Shutting down.")
System.exit(1)
case StopExecutor =>
@@ -99,12 +97,13 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
+ indestructible = true)
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
- val actor = actorSystem.actorOf(
- Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
+ actorSystem.actorOf(
+ Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5c9bb9db1c..0b0a60ee60 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -121,7 +121,7 @@ private[spark] class Executor(
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = {
- env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
+ env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
}
// Start worker thread pool
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 9c2fee4023..703bc6a9ca 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -31,11 +31,11 @@ import scala.collection.mutable.SynchronizedMap
import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable.ArrayBuffer
-import akka.dispatch.{Await, Promise, ExecutionContext, Future}
-import akka.util.Duration
-import akka.util.duration._
-import org.apache.spark.util.Utils
+import scala.concurrent.{Await, Promise, ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
+import org.apache.spark.util.Utils
private[spark] class ConnectionManager(port: Int) extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 8d9ad9604d..4f5742d29b 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -25,8 +25,8 @@ import scala.io.Source
import java.nio.ByteBuffer
import java.net.InetAddress
-import akka.dispatch.Await
-import akka.util.duration._
+import scala.concurrent.Await
+import scala.concurrent.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index faaf837be0..d1c74a5063 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
@@ -28,7 +29,7 @@ import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
* A set of asynchronous RDD actions available through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
-class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with Logging {
+class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
/**
* Returns a future for counting the number of elements in the RDD.
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 44ea573a7c..424354ae16 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
import org.apache.spark.storage.{BlockId, BlockManager}
@@ -25,7 +27,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P
}
private[spark]
-class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 0de22f0e06..87b950ba43 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -18,6 +18,7 @@
package org.apache.spark.rdd
import java.io.{ObjectOutputStream, IOException}
+import scala.reflect.ClassTag
import org.apache.spark._
@@ -43,7 +44,7 @@ class CartesianPartition(
}
private[spark]
-class CartesianRDD[T: ClassManifest, U:ClassManifest](
+class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index ef4057e2a2..80385fce57 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -17,16 +17,15 @@
package org.apache.spark.rdd
+import java.io.IOException
+
+import scala.reflect.ClassTag
+import java.io.{IOException}
import org.apache.spark._
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{NullWritable, BytesWritable}
-import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.fs.Path
-import java.io.{File, IOException, EOFException}
-import java.text.NumberFormat
-import org.apache.spark.broadcast.Broadcast
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -34,7 +33,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
* This RDD represents a RDD checkpoint file (similar to HadoopRDD).
*/
private[spark]
-class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
+class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c5de6362a9..98da35763b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable
import scala.Some
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
/**
* Class that captures a coalesced RDD by essentially keeping track of parent partitions
@@ -68,7 +69,7 @@ case class CoalescedRDDPartition(
* @param maxPartitions number of desired partitions in the coalesced RDD
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
-class CoalescedRDD[T: ClassManifest](
+class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 02d75eccc5..688c310ee9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -90,12 +90,13 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
// Compute the minimum and the maxium
val (max: Double, min: Double) = self.mapPartitions { items =>
- Iterator(items.foldRight(-1/0.0, Double.NaN)((e: Double, x: Pair[Double, Double]) =>
+ Iterator(items.foldRight(Double.NegativeInfinity,
+ Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
(x._1.max(e), x._2.min(e))))
}.reduce { (maxmin1, maxmin2) =>
(maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
}
- if (max.isNaN() || max.isInfinity || min.isInfinity ) {
+ if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity ) {
throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index c8900d1a93..a84e5f9fd8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -17,13 +17,14 @@
package org.apache.spark.rdd
-import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
+import scala.reflect.ClassTag
+import org.apache.spark.{Partition, SparkContext, TaskContext}
/**
* An RDD that is empty, i.e. has no element in it.
*/
-class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
index 5312dc0b59..e74c83b90b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
@@ -18,8 +18,9 @@
package org.apache.spark.rdd
import org.apache.spark.{OneToOneDependency, Partition, TaskContext}
+import scala.reflect.ClassTag
-private[spark] class FilteredRDD[T: ClassManifest](
+private[spark] class FilteredRDD[T: ClassTag](
prev: RDD[T],
f: T => Boolean)
extends RDD[T](prev) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
index cbdf6d84c0..4d1878fc14 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
@@ -18,10 +18,11 @@
package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
private[spark]
-class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+class FlatMappedRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: T => TraversableOnce[U])
extends RDD[U](prev) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
index 829545d7b0..1a694475f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
@@ -18,8 +18,9 @@
package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
-private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
+private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T])
extends RDD[Array[T]](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index aca0146884..8df8718f3b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -19,6 +19,8 @@ package org.apache.spark.rdd
import java.sql.{Connection, ResultSet}
+import scala.reflect.ClassTag
+
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.util.NextIterator
@@ -45,7 +47,7 @@ private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) e
* This should only call getInt, getString, etc; the RDD takes care of calling next.
* The default maps a ResultSet to an array of Object.
*/
-class JdbcRDD[T: ClassManifest](
+class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index ae70d55951..db15baf503 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -18,9 +18,9 @@
package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
-
-private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
+private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
index e8be1c4816..8d7c288593 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
@@ -17,10 +17,12 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.spark.{Partition, TaskContext}
private[spark]
-class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
+class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 697be8b997..d5691f2267 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -17,7 +17,9 @@
package org.apache.spark.rdd
-import org.apache.spark.{RangePartitioner, Logging}
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, RangePartitioner}
/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
@@ -25,9 +27,9 @@ import org.apache.spark.{RangePartitioner, Logging}
* use these functions. They will work with any key type that has a `scala.math.Ordered`
* implementation.
*/
-class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
- V: ClassManifest,
- P <: Product2[K, V] : ClassManifest](
+class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
+ V: ClassTag,
+ P <: Product2[K, V] : ClassTag](
self: RDD[P])
extends Logging with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 93b78e1232..48168e152e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -25,6 +25,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
+import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.mapred._
import org.apache.hadoop.io.compress.CompressionCodec
@@ -50,7 +51,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
+class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
extends Logging
with SparkHadoopMapReduceUtil
with Serializable {
@@ -415,7 +416,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
- val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
prfs.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
@@ -431,7 +432,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
- val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
prfs.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
}
@@ -488,15 +489,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
subtractByKey(other, new HashPartitioner(numPartitions))
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
new SubtractedRDD[K, V, W](self, other, p)
/**
@@ -525,8 +526,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
- def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
+ saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -535,16 +536,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* supplied codec.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
- path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
+ path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
+ saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec)
}
/**
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
- def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
- saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
+ saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -698,11 +699,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
*/
def values: RDD[V] = self.map(_._2)
- private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
+ private[spark] def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
- private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
+ private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass
}
-private[spark] object Manifests {
- val seqSeqManifest = classManifest[Seq[Seq[_]]]
+private[spark] object ClassTags {
+ val seqSeqClassTag = classTag[Seq[Seq[_]]]
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index cd96250389..09d0a8189d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -20,13 +20,15 @@ package org.apache.spark.rdd
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
+import scala.reflect.ClassTag
+
import org.apache.spark._
import java.io._
import scala.Serializable
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils
-private[spark] class ParallelCollectionPartition[T: ClassManifest](
+private[spark] class ParallelCollectionPartition[T: ClassTag](
var rddId: Long,
var slice: Int,
var values: Seq[T])
@@ -78,7 +80,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
}
}
-private[spark] class ParallelCollectionRDD[T: ClassManifest](
+private[spark] class ParallelCollectionRDD[T: ClassTag](
@transient sc: SparkContext,
@transient data: Seq[T],
numSlices: Int,
@@ -109,7 +111,7 @@ private object ParallelCollectionRDD {
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers.
*/
- def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
+ def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of slices required")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index 574dd4233f..ea8885b36e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.spark.{NarrowDependency, SparkEnv, Partition, TaskContext}
@@ -49,7 +51,7 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
* and the execution DAG has a filter on the key, we can avoid launching tasks
* on partitions that don't have the range covering the key.
*/
-class PartitionPruningRDD[T: ClassManifest](
+class PartitionPruningRDD[T: ClassTag](
@transient prev: RDD[T],
@transient partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
@@ -69,6 +71,6 @@ object PartitionPruningRDD {
* when its type T is not known at compile time.
*/
def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) = {
- new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassManifest)
+ new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassTag)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index d5304ab0ae..1dbbe39898 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -24,6 +24,7 @@ import scala.collection.Map
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
+import scala.reflect.ClassTag
import org.apache.spark.{SparkEnv, Partition, TaskContext}
import org.apache.spark.broadcast.Broadcast
@@ -33,7 +34,7 @@ import org.apache.spark.broadcast.Broadcast
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
-class PipedRDD[T: ClassManifest](
+class PipedRDD[T: ClassTag](
prev: RDD[T],
command: Seq[String],
envVars: Map[String, String],
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 893708f8f2..ea45566ad1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -23,6 +23,9 @@ import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.reflect.{classTag, ClassTag}
+
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
@@ -69,7 +72,7 @@ import org.apache.spark._
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
* on RDD internals.
*/
-abstract class RDD[T: ClassManifest](
+abstract class RDD[T: ClassTag](
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
@@ -243,13 +246,13 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
- def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+ def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
+ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
/**
@@ -374,25 +377,25 @@ abstract class RDD[T: ClassManifest](
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
* elements (a, b) where a is in `this` and b is in `other`.
*/
- def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
+ def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
groupBy[K](f, defaultPartitioner(this))
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
groupBy(f, new HashPartitioner(numPartitions))
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+ def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
@@ -439,7 +442,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U: ClassManifest](
+ def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
@@ -449,7 +452,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- def mapPartitionsWithIndex[U: ClassManifest](
+ def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
@@ -459,7 +462,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD. This is a variant of
* mapPartitions that also passes the TaskContext into the closure.
*/
- def mapPartitionsWithContext[U: ClassManifest](
+ def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
@@ -471,7 +474,7 @@ abstract class RDD[T: ClassManifest](
* of the original partition.
*/
@deprecated("use mapPartitionsWithIndex", "0.7.0")
- def mapPartitionsWithSplit[U: ClassManifest](
+ def mapPartitionsWithSplit[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
mapPartitionsWithIndex(f, preservesPartitioning)
}
@@ -481,7 +484,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def mapWith[A: ClassManifest, U: ClassManifest]
+ def mapWith[A: ClassTag, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => U): RDD[U] = {
mapPartitionsWithIndex((index, iter) => {
@@ -495,7 +498,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def flatMapWith[A: ClassManifest, U: ClassManifest]
+ def flatMapWith[A: ClassTag, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => Seq[U]): RDD[U] = {
mapPartitionsWithIndex((index, iter) => {
@@ -509,7 +512,7 @@ abstract class RDD[T: ClassManifest](
* This additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def foreachWith[A: ClassManifest](constructA: Int => A)(f: (T, A) => Unit) {
+ def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) {
mapPartitionsWithIndex { (index, iter) =>
val a = constructA(index)
iter.map(t => {f(t, a); t})
@@ -521,7 +524,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def filterWith[A: ClassManifest](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
+ def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.filter(t => p(t, a))
@@ -534,7 +537,7 @@ abstract class RDD[T: ClassManifest](
* partitions* and the *same number of elements in each partition* (e.g. one was made through
* a map on the other).
*/
- def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
+ def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
/**
* Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
@@ -542,32 +545,27 @@ abstract class RDD[T: ClassManifest](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
- def zipPartitions[B: ClassManifest, V: ClassManifest]
- (rdd2: RDD[B], preservesPartitioning: Boolean)
- (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
-
- def zipPartitions[B: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
@@ -605,7 +603,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD that contains all matching values by applying `f`.
*/
- def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
+ def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = {
filter(f.isDefinedAt).map(f)
}
@@ -695,7 +693,7 @@ abstract class RDD[T: ClassManifest](
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*/
- def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
+ def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
@@ -744,7 +742,7 @@ abstract class RDD[T: ClassManifest](
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): Map[T, Long] = {
- if (elementClassManifest.erasure.isArray) {
+ if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
@@ -775,7 +773,7 @@ abstract class RDD[T: ClassManifest](
timeout: Long,
confidence: Double = 0.95
): PartialResult[Map[T, BoundedDouble]] = {
- if (elementClassManifest.erasure.isArray) {
+ if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
@@ -942,12 +940,12 @@ abstract class RDD[T: ClassManifest](
/** Record user function generating this RDD. */
@transient private[spark] val origin = Utils.formatSparkCallSite
- private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
+ private[spark] def elementClassTag: ClassTag[T] = classTag[T]
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
/** Returns the first parent RDD */
- protected[spark] def firstParent[U: ClassManifest] = {
+ protected[spark] def firstParent[U: ClassTag] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
@@ -1009,7 +1007,7 @@ abstract class RDD[T: ClassManifest](
origin)
def toJavaRDD() : JavaRDD[T] = {
- new JavaRDD(this)(elementClassManifest)
+ new JavaRDD(this)(elementClassTag)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 3160ab95c4..5a565d7e78 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -17,6 +17,8 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
@@ -38,7 +40,7 @@ private[spark] object CheckpointState extends Enumeration {
* manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
* of the checkpointed RDD.
*/
-private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
+private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
extends Logging with Serializable {
import CheckpointState._
diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
index 2c5253ae30..d433670cc2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
@@ -17,6 +17,7 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
import java.util.Random
import cern.jet.random.Poisson
@@ -29,9 +30,9 @@ class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition
override val index: Int = prev.index
}
-class SampledRDD[T: ClassManifest](
+class SampledRDD[T: ClassTag](
prev: RDD[T],
- withReplacement: Boolean,
+ withReplacement: Boolean,
frac: Double,
seed: Int)
extends RDD[T](prev) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 5fe4676029..2d1bd5b481 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.spark.rdd
+import scala.reflect.{ ClassTag, classTag}
+
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.io.compress.CompressionCodec
@@ -32,15 +33,15 @@ import org.apache.spark.Logging
*
* Import `org.apache.spark.SparkContext._` at the top of their program to use these functions.
*/
-class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
+class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
self: RDD[(K, V)])
extends Logging
with Serializable {
- private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
+ private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
- classManifest[T].erasure
+ if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
+ classTag[T].runtimeClass
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index a5d751a7bd..3682c84598 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -17,8 +17,10 @@
package org.apache.spark.rdd
-import org.apache.spark.{Dependency, Partitioner, SparkEnv, ShuffleDependency, Partition, TaskContext}
+import scala.reflect.ClassTag
+import org.apache.spark.{Dependency, Partition, Partitioner, ShuffleDependency,
+ SparkEnv, TaskContext}
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
override val index = idx
@@ -32,7 +34,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam K the key class.
* @tparam V the value class.
*/
-class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
+class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
@transient var prev: RDD[P],
part: Partitioner)
extends RDD[P](prev.context, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index 7af4d803e7..aab30b1bb4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -18,8 +18,11 @@
package org.apache.spark.rdd
import java.util.{HashMap => JHashMap}
+
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
import org.apache.spark.Partitioner
import org.apache.spark.Dependency
import org.apache.spark.TaskContext
@@ -45,7 +48,7 @@ import org.apache.spark.OneToOneDependency
* you can use `rdd1`'s partitioner/partition size and not worry about running
* out of memory because of the size of `rdd2`.
*/
-private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
+private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
@transient var rdd1: RDD[_ <: Product2[K, V]],
@transient var rdd2: RDD[_ <: Product2[K, W]],
part: Partitioner)
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index ae8a9f36a6..08a41ac558 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -18,10 +18,13 @@
package org.apache.spark.rdd
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
import org.apache.spark.{Dependency, RangeDependency, SparkContext, Partition, TaskContext}
+
import java.io.{ObjectOutputStream, IOException}
-private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int)
+private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitIndex: Int)
extends Partition {
var split: Partition = rdd.partitions(splitIndex)
@@ -40,7 +43,7 @@ private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], spl
}
}
-class UnionRDD[T: ClassManifest](
+class UnionRDD[T: ClassTag](
sc: SparkContext,
@transient var rdds: Seq[RDD[T]])
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index a97d2a01c8..83be3c6eb4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import org.apache.spark.{OneToOneDependency, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
+import scala.reflect.ClassTag
private[spark] class ZippedPartitionsPartition(
idx: Int,
@@ -38,7 +39,7 @@ private[spark] class ZippedPartitionsPartition(
}
}
-abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
+abstract class ZippedPartitionsBaseRDD[V: ClassTag](
sc: SparkContext,
var rdds: Seq[RDD[_]],
preservesPartitioning: Boolean = false)
@@ -71,7 +72,7 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
}
}
-class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest](
+class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
@@ -92,7 +93,7 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]
}
class ZippedPartitionsRDD3
- [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
@@ -117,7 +118,7 @@ class ZippedPartitionsRDD3
}
class ZippedPartitionsRDD4
- [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
+ [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
var rdd1: RDD[A],
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
index 567b67dfee..fb5b070c18 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
@@ -18,10 +18,12 @@
package org.apache.spark.rdd
import org.apache.spark.{OneToOneDependency, SparkContext, Partition, TaskContext}
+
import java.io.{ObjectOutputStream, IOException}
+import scala.reflect.ClassTag
-private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
+private[spark] class ZippedPartition[T: ClassTag, U: ClassTag](
idx: Int,
@transient rdd1: RDD[T],
@transient rdd2: RDD[U]
@@ -42,7 +44,7 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
}
}
-class ZippedRDD[T: ClassManifest, U: ClassManifest](
+class ZippedRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1: RDD[T],
var rdd2: RDD[U])
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f9cd021dd3..963d15b76d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -21,9 +21,11 @@ import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor._
-import akka.util.duration._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+import akka.actor._
import org.apache.spark._
import org.apache.spark.rdd.RDD
@@ -104,7 +106,7 @@ class DAGScheduler(
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
- val RESUBMIT_TIMEOUT = 50L
+ val RESUBMIT_TIMEOUT = 50.milliseconds
// The time, in millis, to wake up between polls of the completion queue in order to potentially
// resubmit failed stages
@@ -177,13 +179,14 @@ class DAGScheduler(
var resubmissionTask: Cancellable = _
override def preStart() {
+ import context.dispatcher
/**
* A message is sent to the actor itself periodically to remind the actor to resubmit failed
* stages. In this way, stage resubmission can be done within the same thread context of
* other event processing logic to avoid unnecessary synchronization overhead.
*/
resubmissionTask = context.system.scheduler.schedule(
- RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
+ RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
}
/**
@@ -460,7 +463,7 @@ class DAGScheduler(
waiter
}
- def runJob[T, U: ClassManifest](
+ def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
index 0a786deb16..3832ee7ff6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
@@ -22,7 +22,7 @@ package org.apache.spark.scheduler
* to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
-object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
+object SchedulingMode extends Enumeration {
type SchedulingMode = Value
val FAIR,FIFO,NONE = Value
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3841b5616d..ee63b3c4a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -63,7 +63,7 @@ trait SparkListener {
* Called when a task begins remotely fetching its result (will not be called for tasks that do
* not need to fetch the result remotely).
*/
- def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
+ def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
/**
* Called when a task ends
@@ -131,8 +131,8 @@ object StatsReportListener extends Logging {
def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
val stats = d.statCounter
- logInfo(heading + stats)
val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+ logInfo(heading + stats)
logInfo(percentilesHeader)
logInfo("\t" + quantiles.mkString("\t"))
}
@@ -173,8 +173,6 @@ object StatsReportListener extends Logging {
showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
}
-
-
val seconds = 1000L
val minutes = seconds * 60
val hours = minutes * 60
@@ -198,7 +196,6 @@ object StatsReportListener extends Logging {
}
-
case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index d5824e7954..85687ea330 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -91,4 +91,3 @@ private[spark] class SparkListenerBus() extends Logging {
return true
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
index 47b0f387aa..35de13c385 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -18,9 +18,7 @@
package org.apache.spark.scheduler
-private[spark] object TaskLocality
- extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY")
-{
+private[spark] object TaskLocality extends Enumeration {
// process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 4d82430b97..66ab8ea4cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -24,8 +24,7 @@ import java.util.{TimerTask, Timer}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
-
-import akka.util.duration._
+import scala.concurrent.duration._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
@@ -122,7 +121,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (System.getProperty("spark.speculation", "false").toBoolean) {
logInfo("Starting speculative execution thread")
-
+ import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
checkSpeculatableTasks()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 94961790df..bf494aa64d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -529,10 +529,10 @@ private[spark] class ClusterTaskSetManager(
addPendingTask(index)
if (state != TaskState.KILLED) {
numFailures(index) += 1
- if (numFailures(index) > MAX_TASK_FAILURES) {
- logError("Task %s:%d failed more than %d times; aborting job".format(
+ if (numFailures(index) >= MAX_TASK_FAILURES) {
+ logError("Task %s:%d failed %d times; aborting job".format(
taskSet.id, index, MAX_TASK_FAILURES))
- abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
+ abort("Task %s:%d failed %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
}
}
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index d0ba5bf55d..f5e8766f6d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -20,13 +20,12 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
-import akka.dispatch.Await
import akka.pattern.ask
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
-import akka.util.Duration
-import akka.util.duration._
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.TaskDescription
@@ -53,15 +52,15 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
- private val actorToExecutorId = new HashMap[ActorRef, String]
private val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// Periodically revive offers to allow delay scheduling to work
val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
+ import context.dispatcher
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
@@ -73,12 +72,10 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties)
- context.watch(sender)
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
- actorToExecutorId(sender) = executorId
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
makeOffers()
@@ -118,14 +115,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
removeExecutor(executorId, reason)
sender ! true
- case Terminated(actor) =>
- actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
+ case DisassociatedEvent(_, address, _) =>
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
- case RemoteClientDisconnected(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
-
- case RemoteClientShutdown(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
// Make fake resource offers on all executors
@@ -153,7 +145,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId)
- actorToExecutorId -= executorActor(executorId)
addressToExecutorId -= executorAddress(executorId)
executorActor -= executorId
executorHost -= executorId
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index e000531a26..e8fecec4a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -36,7 +36,7 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index cefa970bb9..7127a72d6d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -42,7 +42,7 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
index 2064d97b49..e68c527713 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
@@ -71,7 +71,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
- case ex =>
+ case ex: Throwable =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
}
@@ -95,7 +95,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche
val loader = Thread.currentThread.getContextClassLoader
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
- case ex => {}
+ case ex: Throwable => {}
}
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index cd521e0f2b..84fe3094cc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -120,7 +120,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 702aca8323..19a025a329 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -24,9 +24,9 @@ import scala.collection.mutable.{HashMap, ArrayBuffer}
import scala.util.Random
import akka.actor.{ActorSystem, Cancellable, Props}
-import akka.dispatch.{Await, Future}
-import akka.util.Duration
-import akka.util.duration._
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
@@ -924,4 +924,3 @@ private[spark] object BlockManager extends Logging {
blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 94038649b3..e05b842476 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -17,16 +17,17 @@
package org.apache.spark.storage
-import akka.actor.ActorRef
-import akka.dispatch.{Await, Future}
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import akka.actor._
import akka.pattern.ask
-import akka.util.Duration
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-
-private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
+private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
@@ -156,7 +157,10 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
- val future = driverActor.ask(message)(timeout)
+ val future = driverActor match {
+ case Left(a: ActorRef) => a.ask(message)(timeout)
+ case Right(b: ActorSelection) => b.ask(message)(timeout)
+ }
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index f8cf14b503..154a3980e9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -23,10 +23,10 @@ import scala.collection.mutable
import scala.collection.JavaConversions._
import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.dispatch.Future
import akka.pattern.ask
-import akka.util.Duration
-import akka.util.duration._
+
+import scala.concurrent.duration._
+import scala.concurrent.Future
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
@@ -65,6 +65,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting) {
+ import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 860e680576..a8db37ded1 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -93,7 +93,7 @@ private[spark] object ThreadingTest {
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
val blockManagerMaster = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+ Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index e7eab374ad..c1ee2f3d00 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ui.jobs
-import akka.util.Duration
+import scala.concurrent.duration._
import java.text.SimpleDateFormat
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index 1d633d374a..a5446b3fc3 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ui.storage
-import akka.util.Duration
+import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index d4c5065c3f..74133cef6c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,11 +17,8 @@
package org.apache.spark.util
-import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
-import akka.util.duration._
-import akka.remote.RemoteActorRefProvider
-
/**
* Various utility classes for working with Akka.
@@ -34,39 +31,57 @@ private[spark] object AkkaUtils {
*
* Note: the `name` parameter is important, as even if a client sends a message to right
* host + port, if the system name is incorrect, Akka will drop the message.
+ *
+ * If indestructible is set to true, the Actor System will continue running in the event
+ * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
- def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
- val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
+ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false)
+ : (ActorSystem, Int) = {
+
+ val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
- val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt
+
+ val akkaTimeout = System.getProperty("spark.akka.timeout", "100").toInt
+
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
- val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- // 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
- val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt
-
- val akkaConf = ConfigFactory.parseString("""
- akka.daemonic = on
- akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
- akka.stdout-loglevel = "ERROR"
- akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
- akka.remote.netty.hostname = "%s"
- akka.remote.netty.port = %d
- akka.remote.netty.connection-timeout = %ds
- akka.remote.netty.message-frame-size = %d MiB
- akka.remote.netty.execution-pool-size = %d
- akka.actor.default-dispatcher.throughput = %d
- akka.remote.log-remote-lifecycle-events = %s
- akka.remote.netty.write-timeout = %ds
- """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
- lifecycleEvents, akkaWriteTimeout))
+ val lifecycleEvents =
+ if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+
+ val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "600").toInt
+ val akkaFailureDetector =
+ System.getProperty("spark.akka.failure-detector.threshold", "300.0").toDouble
+ val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "1000").toInt
- val actorSystem = ActorSystem(name, akkaConf)
+ val akkaConf = ConfigFactory.parseString(
+ s"""
+ |akka.daemonic = on
+ |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
+ |akka.stdout-loglevel = "ERROR"
+ |akka.jvm-exit-on-fatal-error = off
+ |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
+ |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+ |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
+ |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
+ |akka.remote.netty.tcp.hostname = "$host"
+ |akka.remote.netty.tcp.port = $port
+ |akka.remote.netty.tcp.tcp-nodelay = on
+ |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
+ |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
+ |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
+ |akka.actor.default-dispatcher.throughput = $akkaBatchSize
+ |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
+ """.stripMargin)
+
+ val actorSystem = if (indestructible) {
+ IndestructibleActorSystem(name, akkaConf)
+ } else {
+ ActorSystem(name, akkaConf)
+ }
- // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
- // hack because Akka doesn't let you figure out the port through the public API yet.
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
- return (actorSystem, boundPort)
+ val boundPort = provider.getDefaultAddress.port.get
+ (actorSystem, boundPort)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
index 0b51c23f7b..a38329df03 100644
--- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
+++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -34,6 +34,8 @@ class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
override def iterator: Iterator[A] = underlying.iterator.asScala
+ override def size: Int = underlying.size
+
override def ++=(xs: TraversableOnce[A]): this.type = {
xs.foreach { this += _ }
this
diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
new file mode 100644
index 0000000000..bf71882ef7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Must be in akka.actor package as ActorSystemImpl is protected[akka].
+package akka.actor
+
+import scala.util.control.{ControlThrowable, NonFatal}
+
+import com.typesafe.config.Config
+
+/**
+ * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
+ * This is necessary as Spark Executors are allowed to recover from fatal exceptions
+ * (see [[org.apache.spark.executor.Executor]]).
+ */
+object IndestructibleActorSystem {
+ def apply(name: String, config: Config): ActorSystem =
+ apply(name, config, ActorSystem.findClassLoader())
+
+ def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
+ new IndestructibleActorSystemImpl(name, config, classLoader).start()
+}
+
+private[akka] class IndestructibleActorSystemImpl(
+ override val name: String,
+ applicationConfig: Config,
+ classLoader: ClassLoader)
+ extends ActorSystemImpl(name, applicationConfig, classLoader) {
+
+ protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
+ val fallbackHandler = super.uncaughtExceptionHandler
+
+ new Thread.UncaughtExceptionHandler() {
+ def uncaughtException(thread: Thread, cause: Throwable): Unit = {
+ if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
+ log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
+ "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
+ //shutdown() //TODO make it configurable
+ } else {
+ fallbackHandler.uncaughtException(thread, cause)
+ }
+ }
+ }
+ }
+
+ def isFatalError(e: Throwable): Boolean = {
+ e match {
+ case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
+ false
+ case _ =>
+ true
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 67a7f87a5c..7b41ef89f1 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -55,8 +55,7 @@ class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, clea
}
}
-object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask",
- "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") {
+object MetadataCleanerType extends Enumeration {
val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index 277de2f8a6..dbff571de9 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -85,7 +85,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging {
}
override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
- JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
+ JavaConversions.mapAsScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
}
override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a79e64e810..3f7858d2de 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -22,10 +22,11 @@ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
import scala.io.Source
+import scala.reflect.ClassTag
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -319,7 +320,7 @@ private[spark] object Utils extends Logging {
* result in a new collection. Unlike scala.util.Random.shuffle, this method
* uses a local random number generator, avoiding inter-thread contention.
*/
- def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = {
+ def randomize[T: ClassTag](seq: TraversableOnce[T]): Seq[T] = {
randomizeInPlace(seq.toArray)
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 80545c9688..c26f23d500 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection
+import scala.reflect.ClassTag
/**
* A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
@@ -26,7 +27,7 @@ package org.apache.spark.util.collection
* Under the hood, it uses our OpenHashSet implementation.
*/
private[spark]
-class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
+class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
initialCapacity: Int)
extends Iterable[(K, V)]
with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 40986e3731..87e009a4de 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection
+import scala.reflect._
/**
* A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
@@ -36,7 +37,7 @@ package org.apache.spark.util.collection
* to explore all spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing).
*/
private[spark]
-class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
+class OpenHashSet[@specialized(Long, Int) T: ClassTag](
initialCapacity: Int,
loadFactor: Double)
extends Serializable {
@@ -62,14 +63,14 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
// throws:
// scala.tools.nsc.symtab.Types$TypeError: type mismatch;
// found : scala.reflect.AnyValManifest[Long]
- // required: scala.reflect.ClassManifest[Int]
+ // required: scala.reflect.ClassTag[Int]
// at scala.tools.nsc.typechecker.Contexts$Context.error(Contexts.scala:298)
// at scala.tools.nsc.typechecker.Infer$Inferencer.error(Infer.scala:207)
// ...
- val mt = classManifest[T]
- if (mt == ClassManifest.Long) {
+ val mt = classTag[T]
+ if (mt == ClassTag.Long) {
(new LongHasher).asInstanceOf[Hasher[T]]
- } else if (mt == ClassManifest.Int) {
+ } else if (mt == ClassTag.Int) {
(new IntHasher).asInstanceOf[Hasher[T]]
} else {
new Hasher[T]
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index d76143e45a..2e1ef06cbc 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection
+import scala.reflect._
/**
* A fast hash map implementation for primitive, non-null keys. This hash map supports
@@ -26,15 +27,15 @@ package org.apache.spark.util.collection
* Under the hood, it uses our OpenHashSet implementation.
*/
private[spark]
-class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
- @specialized(Long, Int, Double) V: ClassManifest](
+class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
+ @specialized(Long, Int, Double) V: ClassTag](
initialCapacity: Int)
extends Iterable[(K, V)]
with Serializable {
def this() = this(64)
- require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
+ require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
// Init in constructor (instead of in declaration) to work around a Scala compiler specialization
// bug that would generate two arrays (one for Object and one for specialized T).
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index 20554f0aab..b84eb65c62 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -17,11 +17,13 @@
package org.apache.spark.util.collection
+import scala.reflect.ClassTag
+
/**
* An append-only, non-threadsafe, array-backed vector that is optimized for primitive types.
*/
private[spark]
-class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) {
+class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) {
private var _numElements = 0
private var _array: Array[V] = _
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 4434f3b87c..c443c5266e 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -27,6 +27,21 @@ import org.apache.spark.SparkContext._
class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
+
+ implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
+ def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
+ t1 ++= t2
+ t1
+ }
+ def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
+ t1 += t2
+ t1
+ }
+ def zero(t: mutable.Set[A]) : mutable.Set[A] = {
+ new mutable.HashSet[A]()
+ }
+ }
+
test ("basic accumulation"){
sc = new SparkContext("local", "test")
val acc : Accumulator[Int] = sc.accumulator(0)
@@ -51,7 +66,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
test ("add value to collection accumulators") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -68,22 +82,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
}
- implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
- def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
- t1 ++= t2
- t1
- }
- def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
- t1 += t2
- t1
- }
- def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
- new mutable.HashSet[Any]()
- }
- }
-
test ("value not readable in tasks") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -125,7 +124,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
test ("localValue readable in tasks") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d2226aa5a5..f25d921d3f 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import scala.reflect.ClassTag
import org.scalatest.FunSuite
import java.io.File
import org.apache.spark.rdd._
@@ -205,7 +206,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* not, but this is not done by default as usually the partitions do not refer to any RDD and
* therefore never store the lineage.
*/
- def testCheckpointing[U: ClassManifest](
+ def testCheckpointing[U: ClassTag](
op: (RDD[Int]) => RDD[U],
testRDDSize: Boolean = true,
testRDDPartitionSize: Boolean = false
@@ -274,7 +275,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
* this RDD will remember the partitions and therefore potentially the whole lineage.
*/
- def testParentCheckpointing[U: ClassManifest](
+ def testParentCheckpointing[U: ClassTag](
op: (RDD[Int]) => RDD[U],
testRDDSize: Boolean,
testRDDPartitionSize: Boolean
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 480bac84f3..d9cb7fead5 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -122,7 +122,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
}
assert(thrown.getClass === classOf[SparkException])
- assert(thrown.getMessage.contains("more than 4 times"))
+ assert(thrown.getMessage.contains("failed 4 times"))
}
test("caching") {
@@ -303,12 +303,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
Thread.sleep(200)
}
} catch {
- case _ => { Thread.sleep(10) }
+ case _: Throwable => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
}
}
+
}
object DistributedSuite {
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 01a72d8401..6d1695eae7 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -34,7 +34,7 @@ class DriverSuite extends FunSuite with Timeouts {
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
- failAfter(30 seconds) {
+ failAfter(60 seconds) {
Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(System.getenv("SPARK_HOME")))
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index b7eb268bd5..271dc905bc 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -49,14 +49,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -75,7 +75,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -101,13 +101,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
val masterTracker = new MapOutputTrackerMaster()
- masterTracker.trackerActor = actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
+ masterTracker.trackerActor = Left(actorSystem.actorOf(
+ Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker()
- slaveTracker.trackerActor = slaveSystem.actorFor(
- "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
+ slaveTracker.trackerActor = Right(slaveSystem.actorSelection(
+ "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker"))
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
index 46a2da1724..768ca3850e 100644
--- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
@@ -37,7 +37,7 @@ class UnpersistSuite extends FunSuite with LocalSparkContext {
Thread.sleep(200)
}
} catch {
- case _ => { Thread.sleep(10) }
+ case _: Throwable => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 354ab8ae5d..d8dcd6d14c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -244,8 +244,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// test that you get over 90% locality in each group
val minLocality = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.)((perc, loc) => math.min(perc,loc))
- assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
+ .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
+ assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%")
// test that the groups are load balanced with 100 +/- 20 elements in each
val maxImbalance = coalesced2.partitions
@@ -257,9 +257,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced3 = data3.coalesce(numMachines*2)
val minLocality2 = coalesced3.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.)((perc, loc) => math.min(perc,loc))
+ .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
- (minLocality2*100.).toInt + "%")
+ (minLocality2*100.0).toInt + "%")
}
test("zipped RDDs") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 1fd76420ea..2e41438a52 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -145,7 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
// Make a task whose result is larger than the akka frame size
System.setProperty("spark.akka.frameSize", "1")
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index b97f2b19b5..29c4cc5d9c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -283,7 +283,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
- (0 until manager.MAX_TASK_FAILURES).foreach { index =>
+ (1 to manager.MAX_TASK_FAILURES).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
assert(offerResult != None,
"Expect resource offer on iteration %s to return a task".format(index))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
index ee150a3107..27c2d53361 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
@@ -82,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
test("handling results larger than Akka frame size") {
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
@@ -103,7 +103,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index cb76275e39..b647e8a672 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -39,7 +39,7 @@ class BlockIdSuite extends FunSuite {
fail()
} catch {
case e: IllegalStateException => // OK
- case _ => fail()
+ case _: Throwable => fail()
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 484a654108..5b4d63b954 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -56,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
System.setProperty("spark.hostPort", "localhost:" + boundPort)
master = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+ Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 8f0ec6683b..3764f4d1a0 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -34,7 +34,6 @@ class UISuite extends FunSuite {
}
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
-
// Allow some wiggle room in case ports on the machine are under contention
assert(boundPort1 > startPort && boundPort1 < startPort + 10)
assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 4e40dcbdee..5aff26f9fc 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -63,54 +63,53 @@ class SizeEstimatorSuite
}
test("simple classes") {
- assert(SizeEstimator.estimate(new DummyClass1) === 16)
- assert(SizeEstimator.estimate(new DummyClass2) === 16)
- assert(SizeEstimator.estimate(new DummyClass3) === 24)
- assert(SizeEstimator.estimate(new DummyClass4(null)) === 24)
- assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48)
+ expectResult(16)(SizeEstimator.estimate(new DummyClass1))
+ expectResult(16)(SizeEstimator.estimate(new DummyClass2))
+ expectResult(24)(SizeEstimator.estimate(new DummyClass3))
+ expectResult(24)(SizeEstimator.estimate(new DummyClass4(null)))
+ expectResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}
// NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("strings") {
- assert(SizeEstimator.estimate(DummyString("")) === 40)
- assert(SizeEstimator.estimate(DummyString("a")) === 48)
- assert(SizeEstimator.estimate(DummyString("ab")) === 48)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
+ expectResult(40)(SizeEstimator.estimate(DummyString("")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
}
test("primitive arrays") {
- assert(SizeEstimator.estimate(new Array[Byte](10)) === 32)
- assert(SizeEstimator.estimate(new Array[Char](10)) === 40)
- assert(SizeEstimator.estimate(new Array[Short](10)) === 40)
- assert(SizeEstimator.estimate(new Array[Int](10)) === 56)
- assert(SizeEstimator.estimate(new Array[Long](10)) === 96)
- assert(SizeEstimator.estimate(new Array[Float](10)) === 56)
- assert(SizeEstimator.estimate(new Array[Double](10)) === 96)
- assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016)
- assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016)
+ expectResult(32)(SizeEstimator.estimate(new Array[Byte](10)))
+ expectResult(40)(SizeEstimator.estimate(new Array[Char](10)))
+ expectResult(40)(SizeEstimator.estimate(new Array[Short](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[Int](10)))
+ expectResult(96)(SizeEstimator.estimate(new Array[Long](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[Float](10)))
+ expectResult(96)(SizeEstimator.estimate(new Array[Double](10)))
+ expectResult(4016)(SizeEstimator.estimate(new Array[Int](1000)))
+ expectResult(8016)(SizeEstimator.estimate(new Array[Long](1000)))
}
test("object arrays") {
// Arrays containing nulls should just have one pointer per element
- assert(SizeEstimator.estimate(new Array[String](10)) === 56)
- assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56)
-
+ expectResult(56)(SizeEstimator.estimate(new Array[String](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
// For object arrays with non-null elements, each object should take one pointer plus
// however many bytes that class takes. (Note that Array.fill calls the code in its
// second parameter separately for each object, so we get distinct objects.)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296)
- assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56)
+ expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
+ expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
+ expectResult(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
+ expectResult(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
// Past size 100, our samples 100 elements, but we should still get the right size.
- assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016)
+ expectResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
- assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object
- assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object
+ expectResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
+ expectResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
// Same thing with huge array containing the same element many times. Note that this won't
// return exactly 4032 because it can't tell that *all* the elements will equal the first
@@ -128,11 +127,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- assert(SizeEstimator.estimate(DummyString("")) === 40)
- assert(SizeEstimator.estimate(DummyString("a")) === 48)
- assert(SizeEstimator.estimate(DummyString("ab")) === 48)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
-
+ expectResult(40)(SizeEstimator.estimate(DummyString("")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
}
@@ -145,10 +143,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- assert(SizeEstimator.estimate(DummyString("")) === 56)
- assert(SizeEstimator.estimate(DummyString("a")) === 64)
- assert(SizeEstimator.estimate(DummyString("ab")) === 64)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72)
+ expectResult(56)(SizeEstimator.estimate(DummyString("")))
+ expectResult(64)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(64)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)