diff options
12 files changed, 42 insertions, 38 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 0095b868a8..ee09c2085b 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -536,8 +536,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag]( * supplied codec. */ def saveAsHadoopFile[F <: OutputFormat[K, V]]( - path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec) + path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { + saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec) } /** diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 3a454df10d..2ebfaadc46 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -500,18 +500,18 @@ abstract class RDD[T: ClassTag]( * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ - def zipPartitions[B: ClassManifest, V: ClassManifest]( + def zipPartitions[B: ClassTag, V: ClassTag]( f: (Iterator[T], Iterator[B]) => Iterator[V], rdd2: RDD[B]): RDD[V] = new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) - def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]( + def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]( f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V], rdd2: RDD[B], rdd3: RDD[C]): RDD[V] = new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) - def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]( + def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]( f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], rdd2: RDD[B], rdd3: RDD[C], diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala index 6044043add..4562884eb3 100644 --- a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala +++ b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala @@ -1,5 +1,7 @@ package spark.api.java.function +import scala.reflect.ClassTag + /** * A function that takes two inputs and returns zero or more output records. */ @@ -7,5 +9,5 @@ abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Itera @throws(classOf[Exception]) def call(a: A, b:B) : java.lang.Iterable[C] - def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]] + def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]] } diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 3d1e45cb2c..d15c75289d 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -195,7 +195,7 @@ private[spark] object PythonRDD { val arr = elem.asInstanceOf[Array[Byte]] dOut.writeInt(arr.length) dOut.write(arr) - } else if (elem.isInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]) { + } else if (elem.isInstanceOf[scala.Tuple2[_, _]]) { val t = elem.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]] val length = t._1.length + t._2.length - 3 - 3 + 4 // stripPickle() removes 3 bytes dOut.writeInt(length) diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala index 85d1dfeac8..7163f01b24 100644 --- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala @@ -28,7 +28,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String startDaemon() new Socket(daemonHost, daemonPort) } - case e => throw e + case e : Throwable => throw e } } } @@ -88,7 +88,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } }.start() } catch { - case e => { + case e :Throwable => { stopDaemon() throw e } diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala index e4dd3a7fa7..3b9899238c 100644 --- a/core/src/main/scala/spark/rdd/EmptyRDD.scala +++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala @@ -1,12 +1,12 @@ package spark.rdd import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext} - +import scala.reflect.ClassTag /** * An RDD that is empty, i.e. has no element in it. */ -class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) { +class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) { override def getPartitions: Array[Partition] = Array.empty diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala index a50f407737..f570dd6d8b 100644 --- a/core/src/main/scala/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala @@ -4,6 +4,7 @@ import java.sql.{Connection, ResultSet} import spark.{Logging, Partition, RDD, SparkContext, TaskContext} import spark.util.NextIterator +import scala.reflect.ClassTag private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition { override def index = idx @@ -28,7 +29,7 @@ private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) e * This should only call getInt, getString, etc; the RDD takes care of calling next. * The default maps a ResultSet to an array of Object. */ -class JdbcRDD[T: ClassManifest]( +class JdbcRDD[T: ClassTag]( sc: SparkContext, getConnection: () => Connection, sql: String, diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index b234428ab2..d3c2370885 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -2,6 +2,7 @@ package spark.rdd import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} +import scala.reflect.ClassTag private[spark] class ZippedPartitionsPartition( idx: Int, @@ -20,7 +21,7 @@ private[spark] class ZippedPartitionsPartition( } } -abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( +abstract class ZippedPartitionsBaseRDD[V: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[_]]) extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { @@ -67,7 +68,7 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( } } -class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( +class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( sc: SparkContext, f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], @@ -87,7 +88,7 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest] } class ZippedPartitionsRDD3 - [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( + [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], var rdd1: RDD[A], @@ -111,7 +112,7 @@ class ZippedPartitionsRDD3 } class ZippedPartitionsRDD4 - [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( + [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], var rdd1: RDD[A], diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index 327d6797ae..9d618cf531 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -13,13 +13,13 @@ import spark.scheduler._ import spark.TaskState.TaskState import java.nio.ByteBuffer -private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging { +private[spark] object TaskLocality extends Enumeration with Logging { + + type TaskLocality = Value // process local is expected to be used ONLY within tasksetmanager for now. val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value - type TaskLocality = Value - def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { // Must not be the constraint. @@ -524,7 +524,7 @@ private[spark] class ClusterTaskSetManager( case cnf: ClassNotFoundException => val loader = Thread.currentThread().getContextClassLoader throw new SparkException("ClassNotFound with classloader: " + loader, cnf) - case ex => throw ex + case ex : Throwable => throw ex } // Mark finished and stop if we've finished all the tasks finished(index) = true diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala index 6e0c6793e0..3ff3cd5fe4 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala @@ -1,6 +1,6 @@ package spark.scheduler.cluster -object SchedulingMode extends Enumeration("FAIR","FIFO"){ +object SchedulingMode extends Enumeration { type SchedulingMode = Value val FAIR,FIFO = Value diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index e0f6351ef7..47274f41a5 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -85,7 +85,7 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( } // Create Threads for each Topic/Message Stream we are listening - val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]] + val decoder = manifest[D].runtimeClass.newInstance.asInstanceOf[Decoder[T]] val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) // Start the messages handler for each partition @@ -95,7 +95,7 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( } // Handles Kafka Messages - private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable { + private class MessageHandler[T: ClassTag](stream: KafkaStream[T]) extends Runnable { def run() { logInfo("Starting MessageHandler.") for (msgAndMetadata <- stream) { @@ -118,7 +118,7 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case _ : Throwable => // swallow } } } diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index b3daa5a91b..ca654c45c9 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -363,20 +363,20 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 - var done = false - while (!done && tries < maxTries) { - tries += 1 - try { - // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) - fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) - fs.rename(tempHadoopFile, hadoopFile) - done = true - } catch { - case ioe: IOException => { - fs = testDir.getFileSystem(new Configuration()) - logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) - } - } + var done = false + while (!done && tries < maxTries) { + tries += 1 + try { + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) + done = true + } catch { + case ioe: IOException => { + fs = testDir.getFileSystem(new Configuration()) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + } + } } if (!done) logError("Could not generate file " + hadoopFile) |