aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala4
-rw-r--r--core/src/main/scala/spark/RDD.scala6
-rw-r--r--core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala4
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/python/PythonWorkerFactory.scala4
-rw-r--r--core/src/main/scala/spark/rdd/EmptyRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/JdbcRDD.scala3
-rw-r--r--core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala28
12 files changed, 42 insertions, 38 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 0095b868a8..ee09c2085b 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -536,8 +536,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](
* supplied codec.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
- path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
+ path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
+ saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec)
}
/**
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 3a454df10d..2ebfaadc46 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -500,18 +500,18 @@ abstract class RDD[T: ClassTag](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
- def zipPartitions[B: ClassManifest, V: ClassManifest](
+ def zipPartitions[B: ClassTag, V: ClassTag](
f: (Iterator[T], Iterator[B]) => Iterator[V],
rdd2: RDD[B]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](
f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C],
diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
index 6044043add..4562884eb3 100644
--- a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
+++ b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
@@ -1,5 +1,7 @@
package spark.api.java.function
+import scala.reflect.ClassTag
+
/**
* A function that takes two inputs and returns zero or more output records.
*/
@@ -7,5 +9,5 @@ abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Itera
@throws(classOf[Exception])
def call(a: A, b:B) : java.lang.Iterable[C]
- def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]]
+ def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
}
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 3d1e45cb2c..d15c75289d 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -195,7 +195,7 @@ private[spark] object PythonRDD {
val arr = elem.asInstanceOf[Array[Byte]]
dOut.writeInt(arr.length)
dOut.write(arr)
- } else if (elem.isInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]) {
+ } else if (elem.isInstanceOf[scala.Tuple2[_, _]]) {
val t = elem.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]
val length = t._1.length + t._2.length - 3 - 3 + 4 // stripPickle() removes 3 bytes
dOut.writeInt(length)
diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
index 85d1dfeac8..7163f01b24 100644
--- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
@@ -28,7 +28,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
startDaemon()
new Socket(daemonHost, daemonPort)
}
- case e => throw e
+ case e : Throwable => throw e
}
}
}
@@ -88,7 +88,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}.start()
} catch {
- case e => {
+ case e :Throwable => {
stopDaemon()
throw e
}
diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala
index e4dd3a7fa7..3b9899238c 100644
--- a/core/src/main/scala/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala
@@ -1,12 +1,12 @@
package spark.rdd
import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
-
+import scala.reflect.ClassTag
/**
* An RDD that is empty, i.e. has no element in it.
*/
-class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala
index a50f407737..f570dd6d8b 100644
--- a/core/src/main/scala/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala
@@ -4,6 +4,7 @@ import java.sql.{Connection, ResultSet}
import spark.{Logging, Partition, RDD, SparkContext, TaskContext}
import spark.util.NextIterator
+import scala.reflect.ClassTag
private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
override def index = idx
@@ -28,7 +29,7 @@ private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) e
* This should only call getInt, getString, etc; the RDD takes care of calling next.
* The default maps a ResultSet to an array of Object.
*/
-class JdbcRDD[T: ClassManifest](
+class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
index b234428ab2..d3c2370885 100644
--- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -2,6 +2,7 @@ package spark.rdd
import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
+import scala.reflect.ClassTag
private[spark] class ZippedPartitionsPartition(
idx: Int,
@@ -20,7 +21,7 @@ private[spark] class ZippedPartitionsPartition(
}
}
-abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
+abstract class ZippedPartitionsBaseRDD[V: ClassTag](
sc: SparkContext,
var rdds: Seq[RDD[_]])
extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
@@ -67,7 +68,7 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
}
}
-class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest](
+class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
@@ -87,7 +88,7 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]
}
class ZippedPartitionsRDD3
- [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
@@ -111,7 +112,7 @@ class ZippedPartitionsRDD3
}
class ZippedPartitionsRDD4
- [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
+ [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
var rdd1: RDD[A],
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 327d6797ae..9d618cf531 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -13,13 +13,13 @@ import spark.scheduler._
import spark.TaskState.TaskState
import java.nio.ByteBuffer
-private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
+private[spark] object TaskLocality extends Enumeration with Logging {
+
+ type TaskLocality = Value
// process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
- type TaskLocality = Value
-
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
// Must not be the constraint.
@@ -524,7 +524,7 @@ private[spark] class ClusterTaskSetManager(
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread().getContextClassLoader
throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
- case ex => throw ex
+ case ex : Throwable => throw ex
}
// Mark finished and stop if we've finished all the tasks
finished(index) = true
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index 6e0c6793e0..3ff3cd5fe4 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -1,6 +1,6 @@
package spark.scheduler.cluster
-object SchedulingMode extends Enumeration("FAIR","FIFO"){
+object SchedulingMode extends Enumeration {
type SchedulingMode = Value
val FAIR,FIFO = Value
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index e0f6351ef7..47274f41a5 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -85,7 +85,7 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest](
}
// Create Threads for each Topic/Message Stream we are listening
- val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]]
+ val decoder = manifest[D].runtimeClass.newInstance.asInstanceOf[Decoder[T]]
val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder)
// Start the messages handler for each partition
@@ -95,7 +95,7 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest](
}
// Handles Kafka Messages
- private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable {
+ private class MessageHandler[T: ClassTag](stream: KafkaStream[T]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
@@ -118,7 +118,7 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest](
zk.deleteRecursive(dir)
zk.close()
} catch {
- case _ => // swallow
+ case _ : Throwable => // swallow
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index b3daa5a91b..ca654c45c9 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -363,20 +363,20 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
- var done = false
- while (!done && tries < maxTries) {
- tries += 1
- try {
- // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
- fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
- fs.rename(tempHadoopFile, hadoopFile)
- done = true
- } catch {
- case ioe: IOException => {
- fs = testDir.getFileSystem(new Configuration())
- logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
- }
- }
+ var done = false
+ while (!done && tries < maxTries) {
+ tries += 1
+ try {
+ // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
+ fs.rename(tempHadoopFile, hadoopFile)
+ done = true
+ } catch {
+ case ioe: IOException => {
+ fs = testDir.getFileSystem(new Configuration())
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ }
+ }
}
if (!done)
logError("Could not generate file " + hadoopFile)