aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-06-10 21:50:55 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-06-10 21:50:55 -0700
commit92246c843bb63fb9f1da4978bfffe0b7f9192afd (patch)
tree0c704dfb89990cf2085e337a374773f4e6cc72ad
parentc177a546a5ba912df61222418b42e8cd72b8416c (diff)
downloadspark-92246c843bb63fb9f1da4978bfffe0b7f9192afd.tar.gz
spark-92246c843bb63fb9f1da4978bfffe0b7f9192afd.tar.bz2
spark-92246c843bb63fb9f1da4978bfffe0b7f9192afd.zip
Initial work on 2.8 port
-rw-r--r--Makefile10
-rw-r--r--src/examples/LocalALS.scala3
-rw-r--r--src/examples/LocalFileLR.scala4
-rw-r--r--src/examples/LocalPi.scala7
-rw-r--r--src/examples/SparkALS.scala3
-rw-r--r--src/examples/SparkHdfsLR.scala5
-rw-r--r--src/examples/SparkLR.scala3
-rw-r--r--src/examples/SparkPi.scala7
-rw-r--r--src/scala/spark/ClosureCleaner.scala4
-rw-r--r--src/scala/spark/Executor.scala2
-rw-r--r--src/scala/spark/HdfsFile.scala33
-rw-r--r--src/scala/spark/LocalScheduler.scala5
-rw-r--r--src/scala/spark/NexusScheduler.scala2
-rw-r--r--src/scala/spark/ParallelArray.scala25
-rw-r--r--src/scala/spark/Scheduler.scala2
-rw-r--r--src/scala/spark/SerializableRange.scala75
-rw-r--r--src/scala/spark/SparkContext.scala13
-rw-r--r--src/test/spark/ParallelArraySplitSuite.scala18
-rw-r--r--third_party/ScalaCheck-1.5.jarbin825705 -> 0 bytes
-rw-r--r--third_party/nexus.jarbin34246 -> 39513 bytes
-rw-r--r--third_party/scalacheck_2.8.0.RC3-1.7.jarbin0 -> 745630 bytes
-rw-r--r--third_party/scalatest-1.0/LICENSE202
-rw-r--r--third_party/scalatest-1.0/NOTICE7
-rw-r--r--third_party/scalatest-1.0/README.txt58
-rw-r--r--third_party/scalatest-1.0/scalatest-1.0-tests.jarbin4027596 -> 0 bytes
-rw-r--r--third_party/scalatest-1.0/scalatest-1.0.jarbin1653985 -> 0 bytes
-rw-r--r--third_party/scalatest-1.2-for-scala-2.8.0.RC3-SNAPSHOT.jarbin0 -> 1784234 bytes
27 files changed, 84 insertions, 404 deletions
diff --git a/Makefile b/Makefile
index cec6f755f4..48bef384c3 100644
--- a/Makefile
+++ b/Makefile
@@ -8,12 +8,12 @@ JARS += third_party/colt.jar
JARS += third_party/google-collect-1.0-rc5/google-collect-1.0-rc5.jar
JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar
-JARS += third_party/scalatest-1.0/scalatest-1.0.jar
-JARS += third_party/ScalaCheck-1.5.jar
+JARS += third_party/scalatest-1.2-for-scala-2.8.0.RC3-SNAPSHOT.jar
+JARS += third_party/scalacheck_2.8.0.RC3-1.7.jar
CLASSPATH = $(subst $(SPACE),:,$(JARS))
-SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
-SCALA_SOURCES += src/test/spark/*.scala src/test/spark/repl/*.scala
+SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala #src/scala/spark/repl/*.scala
+SCALA_SOURCES += src/test/spark/*.scala #src/test/spark/repl/*.scala
JAVA_SOURCES = $(wildcard src/java/spark/compress/lzf/*.java)
@@ -35,7 +35,7 @@ build/classes:
mkdir -p build/classes
scala: build/classes java
- $(COMPILER) -unchecked -d build/classes -classpath $(CLASSPATH) $(SCALA_SOURCES)
+ $(COMPILER) -unchecked -d build/classes -classpath build/classes:$(CLASSPATH) $(SCALA_SOURCES)
java: $(JAVA_SOURCES) build/classes
javac -d build/classes $(JAVA_SOURCES)
diff --git a/src/examples/LocalALS.scala b/src/examples/LocalALS.scala
index 17d67b522b..a976a5e1c5 100644
--- a/src/examples/LocalALS.scala
+++ b/src/examples/LocalALS.scala
@@ -1,4 +1,5 @@
import java.util.Random
+import scala.math.sqrt
import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
@@ -34,7 +35,7 @@ object LocalALS {
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
- return Math.sqrt(sumSqs / (M * U))
+ return sqrt(sumSqs / (M * U))
}
def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
diff --git a/src/examples/LocalFileLR.scala b/src/examples/LocalFileLR.scala
index 60b4aa8fc4..988442755a 100644
--- a/src/examples/LocalFileLR.scala
+++ b/src/examples/LocalFileLR.scala
@@ -9,11 +9,11 @@ object LocalFileLR {
def parsePoint(line: String): DataPoint = {
val nums = line.split(' ').map(_.toDouble)
- return DataPoint(new Vector(nums.subArray(1, D+1)), nums(0))
+ return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
}
def main(args: Array[String]) {
- val lines = scala.io.Source.fromFile(args(0)).getLines
+ val lines = scala.io.Source.fromPath(args(0)).getLines()
val points = lines.map(parsePoint _)
val ITERATIONS = args(1).toInt
diff --git a/src/examples/LocalPi.scala b/src/examples/LocalPi.scala
index c83aeed40b..c61b3e53d4 100644
--- a/src/examples/LocalPi.scala
+++ b/src/examples/LocalPi.scala
@@ -1,3 +1,4 @@
+import scala.math.random
import spark._
import SparkContext._
@@ -5,10 +6,10 @@ object LocalPi {
def main(args: Array[String]) {
var count = 0
for (i <- 1 to 100000) {
- val x = Math.random * 2 - 1
- val y = Math.random * 2 - 1
+ val x = random * 2 - 1
+ val y = random * 2 - 1
if (x*x + y*y < 1) count += 1
}
println("Pi is roughly " + 4 * count / 100000.0)
}
-} \ No newline at end of file
+}
diff --git a/src/examples/SparkALS.scala b/src/examples/SparkALS.scala
index 2fd58ed3a5..a5d8559d7b 100644
--- a/src/examples/SparkALS.scala
+++ b/src/examples/SparkALS.scala
@@ -1,5 +1,6 @@
import java.io.Serializable
import java.util.Random
+import scala.math.sqrt
import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
@@ -36,7 +37,7 @@ object SparkALS {
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
- return Math.sqrt(sumSqs / (M * U))
+ return sqrt(sumSqs / (M * U))
}
def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
diff --git a/src/examples/SparkHdfsLR.scala b/src/examples/SparkHdfsLR.scala
index d0400380bd..f14d48b17c 100644
--- a/src/examples/SparkHdfsLR.scala
+++ b/src/examples/SparkHdfsLR.scala
@@ -1,4 +1,5 @@
import java.util.Random
+import scala.math.exp
import Vector._
import spark._
@@ -10,7 +11,7 @@ object SparkHdfsLR {
def parsePoint(line: String): DataPoint = {
//val nums = line.split(' ').map(_.toDouble)
- //return DataPoint(new Vector(nums.subArray(1, D+1)), nums(0))
+ //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
val tok = new java.util.StringTokenizer(line, " ")
var y = tok.nextToken.toDouble
var x = new Array[Double](D)
@@ -39,7 +40,7 @@ object SparkHdfsLR {
println("On iteration " + i)
val gradient = sc.accumulator(Vector.zeros(D))
for (p <- points) {
- val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y
+ val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y
gradient += scale * p.x
}
w -= gradient.value
diff --git a/src/examples/SparkLR.scala b/src/examples/SparkLR.scala
index 34574f5640..71f9aea624 100644
--- a/src/examples/SparkLR.scala
+++ b/src/examples/SparkLR.scala
@@ -1,4 +1,5 @@
import java.util.Random
+import scala.math.exp
import Vector._
import spark._
@@ -37,7 +38,7 @@ object SparkLR {
println("On iteration " + i)
val gradient = sc.accumulator(Vector.zeros(D))
for (p <- sc.parallelize(data, numSlices)) {
- val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y
+ val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y
gradient += scale * p.x
}
w -= gradient.value
diff --git a/src/examples/SparkPi.scala b/src/examples/SparkPi.scala
index 7dbadd1088..07311908ee 100644
--- a/src/examples/SparkPi.scala
+++ b/src/examples/SparkPi.scala
@@ -1,3 +1,4 @@
+import scala.math.random
import spark._
import SparkContext._
@@ -11,10 +12,10 @@ object SparkPi {
val slices = if (args.length > 1) args(1).toInt else 2
var count = spark.accumulator(0)
for (i <- spark.parallelize(1 to 100000, slices)) {
- val x = Math.random * 2 - 1
- val y = Math.random * 2 - 1
+ val x = random * 2 - 1
+ val y = random * 2 - 1
if (x*x + y*y < 1) count += 1
}
println("Pi is roughly " + 4 * count.value / 100000.0)
}
-} \ No newline at end of file
+}
diff --git a/src/scala/spark/ClosureCleaner.scala b/src/scala/spark/ClosureCleaner.scala
index c5663901b3..3426e56f60 100644
--- a/src/scala/spark/ClosureCleaner.scala
+++ b/src/scala/spark/ClosureCleaner.scala
@@ -84,6 +84,7 @@ object ClosureCleaner {
}
private def instantiateClass(cls: Class[_], outer: AnyRef): AnyRef = {
+ /* // TODO: Fix for Scala 2.8
if (spark.repl.Main.interp == null) {
// This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor
@@ -93,6 +94,7 @@ object ClosureCleaner {
params(0) = outer // First param is always outer object
return cons.newInstance(params: _*).asInstanceOf[AnyRef]
} else {
+ */
// Use reflection to instantiate object without calling constructor
val rf = sun.reflect.ReflectionFactory.getReflectionFactory();
val parentCtor = classOf[java.lang.Object].getDeclaredConstructor();
@@ -105,7 +107,9 @@ object ClosureCleaner {
field.set(obj, outer)
}
return obj
+ /*
}
+ */
}
}
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala
index 4cc8f00aa9..1f7f6f32fa 100644
--- a/src/scala/spark/Executor.scala
+++ b/src/scala/spark/Executor.scala
@@ -24,11 +24,13 @@ object Executor {
// If the REPL is in use, create a ClassLoader that will be able to
// read new classes defined by the REPL as the user types code
classLoader = this.getClass.getClassLoader
+ /* // TODO: Fix for Scala 2.8
val classDir = System.getProperty("spark.repl.classdir")
if (classDir != null) {
println("Using REPL classdir: " + classDir)
classLoader = new repl.ExecutorClassLoader(classDir, classLoader)
}
+ */
Thread.currentThread.setContextClassLoader(classLoader)
// Start worker thread pool (they will inherit our context ClassLoader)
diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala
index 8050683f99..abc93adc74 100644
--- a/src/scala/spark/HdfsFile.scala
+++ b/src/scala/spark/HdfsFile.scala
@@ -24,7 +24,8 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
@serializable
-abstract class DistributedFile[T, Split](@transient sc: SparkContext) {
+abstract class DistributedFile[T: ClassManifest, Split](
+ @transient sc: SparkContext) {
def splits: Array[Split]
def iterator(split: Split): Iterator[T]
def prefers(split: Split, slot: SlaveOffer): Boolean
@@ -74,7 +75,7 @@ abstract class DistributedFile[T, Split](@transient sc: SparkContext) {
case _ => throw new UnsupportedOperationException("empty collection")
}
- def map[U](f: T => U) = new MappedFile(this, sc.clean(f))
+ def map[U: ClassManifest](f: T => U) = new MappedFile(this, sc.clean(f))
def filter(f: T => Boolean) = new FilteredFile(this, sc.clean(f))
def cache() = new CachedFile(this)
@@ -84,15 +85,15 @@ abstract class DistributedFile[T, Split](@transient sc: SparkContext) {
}
@serializable
-abstract class FileTask[U, T, Split](val file: DistributedFile[T, Split],
- val split: Split)
+abstract class FileTask[U: ClassManifest, T: ClassManifest, Split](
+ val file: DistributedFile[T, Split], val split: Split)
extends Task[U] {
override def prefers(slot: SlaveOffer) = file.prefers(split, slot)
override def markStarted(slot: SlaveOffer) { file.taskStarted(split, slot) }
}
-class ForeachTask[T, Split](file: DistributedFile[T, Split],
- split: Split, func: T => Unit)
+class ForeachTask[T: ClassManifest, Split](
+ file: DistributedFile[T, Split], split: Split, func: T => Unit)
extends FileTask[Unit, T, Split](file, split) {
override def run() {
println("Processing " + split)
@@ -100,16 +101,17 @@ extends FileTask[Unit, T, Split](file, split) {
}
}
-class GetTask[T, Split](file: DistributedFile[T, Split], split: Split)
+class GetTask[T, Split](
+ file: DistributedFile[T, Split], split: Split)(implicit m: ClassManifest[T])
extends FileTask[Array[T], T, Split](file, split) {
override def run(): Array[T] = {
println("Processing " + split)
- file.iterator(split).collect.toArray
+ file.iterator(split).toArray(m)
}
}
-class ReduceTask[T, Split](file: DistributedFile[T, Split],
- split: Split, f: (T, T) => T)
+class ReduceTask[T: ClassManifest, Split](
+ file: DistributedFile[T, Split], split: Split, f: (T, T) => T)
extends FileTask[Option[T], T, Split](file, split) {
override def run(): Option[T] = {
println("Processing " + split)
@@ -121,7 +123,8 @@ extends FileTask[Option[T], T, Split](file, split) {
}
}
-class MappedFile[U, T, Split](prev: DistributedFile[T, Split], f: T => U)
+class MappedFile[U: ClassManifest, T: ClassManifest, Split](
+ prev: DistributedFile[T, Split], f: T => U)
extends DistributedFile[U, Split](prev.sparkContext) {
override def splits = prev.splits
override def prefers(split: Split, slot: SlaveOffer) = prev.prefers(split, slot)
@@ -129,7 +132,8 @@ extends DistributedFile[U, Split](prev.sparkContext) {
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
}
-class FilteredFile[T, Split](prev: DistributedFile[T, Split], f: T => Boolean)
+class FilteredFile[T: ClassManifest, Split](
+ prev: DistributedFile[T, Split], f: T => Boolean)
extends DistributedFile[T, Split](prev.sparkContext) {
override def splits = prev.splits
override def prefers(split: Split, slot: SlaveOffer) = prev.prefers(split, slot)
@@ -137,7 +141,8 @@ extends DistributedFile[T, Split](prev.sparkContext) {
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
}
-class CachedFile[T, Split](prev: DistributedFile[T, Split])
+class CachedFile[T, Split](
+ prev: DistributedFile[T, Split])(implicit m: ClassManifest[T])
extends DistributedFile[T, Split](prev.sparkContext) {
val id = CachedFile.newId()
@transient val cacheLocs = Map[Split, List[Int]]()
@@ -173,7 +178,7 @@ extends DistributedFile[T, Split](prev.sparkContext) {
}
// If we got here, we have to load the split
println("Loading and caching " + split)
- val array = prev.iterator(split).collect.toArray
+ val array = prev.iterator(split).toArray(m)
cache.put(key, array)
loading.synchronized {
loading.remove(key)
diff --git a/src/scala/spark/LocalScheduler.scala b/src/scala/spark/LocalScheduler.scala
index 35bfdde09f..b33f3c863e 100644
--- a/src/scala/spark/LocalScheduler.scala
+++ b/src/scala/spark/LocalScheduler.scala
@@ -13,7 +13,8 @@ private class LocalScheduler(threads: Int) extends Scheduler {
override def waitForRegister() {}
- override def runTasks[T](tasks: Array[Task[T]]): Array[T] = {
+ override def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T])
+ : Array[T] = {
val futures = new Array[Future[TaskResult[T]]](tasks.length)
for (i <- 0 until tasks.length) {
@@ -49,7 +50,7 @@ private class LocalScheduler(threads: Int) extends Scheduler {
val taskResults = futures.map(_.get)
for (result <- taskResults)
Accumulators.add(currentThread, result.accumUpdates)
- return taskResults.map(_.value).toArray
+ return taskResults.map(_.value).toArray(m)
}
override def stop() {}
diff --git a/src/scala/spark/NexusScheduler.scala b/src/scala/spark/NexusScheduler.scala
index a96fca9350..29c2011093 100644
--- a/src/scala/spark/NexusScheduler.scala
+++ b/src/scala/spark/NexusScheduler.scala
@@ -65,7 +65,7 @@ extends nexus.Scheduler with spark.Scheduler
override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo =
new ExecutorInfo(new File("spark-executor").getCanonicalPath(), execArg)
- override def runTasks[T](tasks: Array[Task[T]]): Array[T] = {
+ override def runTasks[T: ClassManifest](tasks: Array[Task[T]]) : Array[T] = {
val results = new Array[T](tasks.length)
if (tasks.length == 0)
return results
diff --git a/src/scala/spark/ParallelArray.scala b/src/scala/spark/ParallelArray.scala
index 90cacf47fc..4f07a1577b 100644
--- a/src/scala/spark/ParallelArray.scala
+++ b/src/scala/spark/ParallelArray.scala
@@ -1,6 +1,6 @@
package spark
-abstract class ParallelArray[T](sc: SparkContext) {
+abstract class ParallelArray[T: ClassManifest](sc: SparkContext) {
def filter(f: T => Boolean): ParallelArray[T] = {
val cleanF = sc.clean(f)
new FilteredParallelArray[T](sc, this, cleanF)
@@ -8,11 +8,11 @@ abstract class ParallelArray[T](sc: SparkContext) {
def foreach(f: T => Unit): Unit
- def map[U](f: T => U): Array[U]
+ def map[U: ClassManifest](f: T => U): Array[U]
}
private object ParallelArray {
- def slice[T](seq: Seq[T], numSlices: Int): Array[Seq[T]] = {
+ def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1)
throw new IllegalArgumentException("Positive number of slices required")
seq match {
@@ -25,23 +25,23 @@ private object ParallelArray {
(0 until numSlices).map(i => {
val start = ((i * r.length.toLong) / numSlices).toInt
val end = (((i+1) * r.length.toLong) / numSlices).toInt
- new SerializableRange(
+ new Range(
r.start + start * r.step, r.start + end * r.step, r.step)
- }).asInstanceOf[Seq[Seq[T]]].toArray
+ }).asInstanceOf[Seq[Seq[T]]]
}
case _ => {
val array = seq.toArray // To prevent O(n^2) operations for List etc
(0 until numSlices).map(i => {
val start = ((i * array.length.toLong) / numSlices).toInt
val end = (((i+1) * array.length.toLong) / numSlices).toInt
- array.slice(start, end).toArray
- }).toArray
+ array.slice(start, end).toSeq
+ })
}
}
}
}
-private class SimpleParallelArray[T](
+private class SimpleParallelArray[T: ClassManifest](
sc: SparkContext, data: Seq[T], numSlices: Int)
extends ParallelArray[T](sc) {
val slices = ParallelArray.slice(data, numSlices)
@@ -53,7 +53,7 @@ extends ParallelArray[T](sc) {
sc.runTasks[Unit](tasks.toArray)
}
- def map[U](f: T => U): Array[U] = {
+ def map[U: ClassManifest](f: T => U): Array[U] = {
val cleanF = sc.clean(f)
var tasks = for (i <- 0 until numSlices) yield
new MapRunner(i, slices(i), cleanF)
@@ -72,14 +72,15 @@ extends Function0[Unit] {
@serializable
private class MapRunner[T, U](sliceNum: Int, data: Seq[T], f: T => U)
+ (implicit m: ClassManifest[U])
extends Function0[Array[U]] {
def apply(): Array[U] = {
printf("Running slice %d of parallel map\n", sliceNum)
- return data.map(f).toArray
+ return data.map(f).toArray(m)
}
}
-private class FilteredParallelArray[T](
+private class FilteredParallelArray[T: ClassManifest](
sc: SparkContext, array: ParallelArray[T], predicate: T => Boolean)
extends ParallelArray[T](sc) {
val cleanPred = sc.clean(predicate)
@@ -89,7 +90,7 @@ extends ParallelArray[T](sc) {
array.foreach(t => if (cleanPred(t)) cleanF(t))
}
- def map[U](f: T => U): Array[U] = {
+ def map[U: ClassManifest](f: T => U): Array[U] = {
val cleanF = sc.clean(f)
throw new UnsupportedOperationException(
"Map is not yet supported on FilteredParallelArray")
diff --git a/src/scala/spark/Scheduler.scala b/src/scala/spark/Scheduler.scala
index 77446d3e78..27cf48e9d2 100644
--- a/src/scala/spark/Scheduler.scala
+++ b/src/scala/spark/Scheduler.scala
@@ -4,6 +4,6 @@ package spark
private trait Scheduler {
def start()
def waitForRegister()
- def runTasks[T](tasks: Array[Task[T]]): Array[T]
+ def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T]
def stop()
}
diff --git a/src/scala/spark/SerializableRange.scala b/src/scala/spark/SerializableRange.scala
deleted file mode 100644
index 5d383a40dc..0000000000
--- a/src/scala/spark/SerializableRange.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-// This is a copy of Scala 2.7.7's Range class, (c) 2006-2009, LAMP/EPFL.
-// The only change here is to make it Serializable, because Ranges aren't.
-// This won't be needed in Scala 2.8, where Scala's Range becomes Serializable.
-
-package spark
-
-@serializable
-private class SerializableRange(val start: Int, val end: Int, val step: Int)
-extends RandomAccessSeq.Projection[Int] {
- if (step == 0) throw new Predef.IllegalArgumentException
-
- /** Create a new range with the start and end values of this range and
- * a new <code>step</code>.
- */
- def by(step: Int): Range = new Range(start, end, step)
-
- override def foreach(f: Int => Unit) {
- if (step > 0) {
- var i = this.start
- val until = if (inInterval(end)) end + 1 else end
-
- while (i < until) {
- f(i)
- i += step
- }
- } else {
- var i = this.start
- val until = if (inInterval(end)) end - 1 else end
-
- while (i > until) {
- f(i)
- i += step
- }
- }
- }
-
- lazy val length: Int = {
- if (start < end && this.step < 0) 0
- else if (start > end && this.step > 0) 0
- else {
- val base = if (start < end) end - start
- else start - end
- assert(base >= 0)
- val step = if (this.step < 0) -this.step else this.step
- assert(step >= 0)
- base / step + last(base, step)
- }
- }
-
- protected def last(base: Int, step: Int): Int =
- if (base % step != 0) 1 else 0
-
- def apply(idx: Int): Int = {
- if (idx < 0 || idx >= length) throw new Predef.IndexOutOfBoundsException
- start + (step * idx)
- }
-
- /** a <code>Seq.contains</code>, not a <code>Iterator.contains</code>! */
- def contains(x: Int): Boolean = {
- inInterval(x) && (((x - start) % step) == 0)
- }
-
- /** Is the argument inside the interval defined by `start' and `end'?
- * Returns true if `x' is inside [start, end).
- */
- protected def inInterval(x: Int): Boolean =
- if (step > 0)
- (x >= start && x < end)
- else
- (x <= start && x > end)
-
- //def inclusive = new Range.Inclusive(start,end,step)
-
- override def toString = "SerializableRange(%d, %d, %d)".format(start, end, step)
-}
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index 4bfbcb6f21..c6ce06a875 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -8,10 +8,12 @@ import scala.collection.mutable.ArrayBuffer
class SparkContext(master: String, frameworkName: String) {
Cache.initialize()
- def parallelize[T](seq: Seq[T], numSlices: Int): ParallelArray[T] =
+ def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int)
+ : ParallelArray[T] =
new SimpleParallelArray[T](this, seq, numSlices)
- def parallelize[T](seq: Seq[T]): ParallelArray[T] = parallelize(seq, 2)
+ def parallelize[T: ClassManifest](seq: Seq[T]): ParallelArray[T] =
+ parallelize(seq, 2)
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)
@@ -42,16 +44,17 @@ class SparkContext(master: String, frameworkName: String) {
val entry = iter.next
val (key, value) = (entry.getKey.toString, entry.getValue.toString)
if (key.startsWith("spark."))
- props += (key, value)
+ props += key -> value
}
return Utils.serialize(props.toArray)
}
- def runTasks[T](tasks: Array[() => T]): Array[T] = {
+ def runTasks[T: ClassManifest](tasks: Array[() => T]): Array[T] = {
runTaskObjects(tasks.map(f => new FunctionTask(f)))
}
- private[spark] def runTaskObjects[T](tasks: Seq[Task[T]]): Array[T] = {
+ private[spark] def runTaskObjects[T: ClassManifest](tasks: Seq[Task[T]])
+ : Array[T] = {
println("Running " + tasks.length + " tasks in parallel")
val start = System.nanoTime
val result = scheduler.runTasks(tasks.toArray)
diff --git a/src/test/spark/ParallelArraySplitSuite.scala b/src/test/spark/ParallelArraySplitSuite.scala
index a1787bd6dd..222df4e071 100644
--- a/src/test/spark/ParallelArraySplitSuite.scala
+++ b/src/test/spark/ParallelArraySplitSuite.scala
@@ -81,7 +81,7 @@ class ParallelArraySplitSuite extends FunSuite with Checkers {
val slices = ParallelArray.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 99)
- assert(slices.forall(_.isInstanceOf[SerializableRange]))
+ assert(slices.forall(_.isInstanceOf[Range]))
}
test("inclusive ranges sliced into ranges") {
@@ -89,7 +89,7 @@ class ParallelArraySplitSuite extends FunSuite with Checkers {
val slices = ParallelArray.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 100)
- assert(slices.forall(_.isInstanceOf[SerializableRange]))
+ assert(slices.forall(_.isInstanceOf[Range]))
}
test("large ranges don't overflow") {
@@ -98,8 +98,8 @@ class ParallelArraySplitSuite extends FunSuite with Checkers {
val slices = ParallelArray.slice(data, 40)
assert(slices.size === 40)
for (i <- 0 until 40) {
- assert(slices(i).isInstanceOf[SerializableRange])
- val range = slices(i).asInstanceOf[SerializableRange]
+ assert(slices(i).isInstanceOf[Range])
+ val range = slices(i).asInstanceOf[Range]
assert(range.start === i * (N / 40), "slice " + i + " start")
assert(range.end === (i+1) * (N / 40), "slice " + i + " end")
assert(range.step === 1, "slice " + i + " step")
@@ -117,7 +117,7 @@ class ParallelArraySplitSuite extends FunSuite with Checkers {
val n = tuple._2
val slices = ParallelArray.slice(d, n)
("n slices" |: slices.size == n) &&
- ("concat to d" |: Array.concat(slices: _*).mkString(",") == d.mkString(",")) &&
+ ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
}
check(prop)
@@ -134,8 +134,8 @@ class ParallelArraySplitSuite extends FunSuite with Checkers {
case (d: Range, n: Int) =>
val slices = ParallelArray.slice(d, n)
("n slices" |: slices.size == n) &&
- ("all ranges" |: slices.forall(_.isInstanceOf[SerializableRange])) &&
- ("concat to d" |: Array.concat(slices: _*).mkString(",") == d.mkString(",")) &&
+ ("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
+ ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
}
check(prop)
@@ -152,8 +152,8 @@ class ParallelArraySplitSuite extends FunSuite with Checkers {
case (d: Range, n: Int) =>
val slices = ParallelArray.slice(d, n)
("n slices" |: slices.size == n) &&
- ("all ranges" |: slices.forall(_.isInstanceOf[SerializableRange])) &&
- ("concat to d" |: Array.concat(slices: _*).mkString(",") == d.mkString(",")) &&
+ ("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
+ ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
}
check(prop)
diff --git a/third_party/ScalaCheck-1.5.jar b/third_party/ScalaCheck-1.5.jar
deleted file mode 100644
index 629b26c69d..0000000000
--- a/third_party/ScalaCheck-1.5.jar
+++ /dev/null
Binary files differ
diff --git a/third_party/nexus.jar b/third_party/nexus.jar
index 3c0b10ed62..17cf0c18eb 100644
--- a/third_party/nexus.jar
+++ b/third_party/nexus.jar
Binary files differ
diff --git a/third_party/scalacheck_2.8.0.RC3-1.7.jar b/third_party/scalacheck_2.8.0.RC3-1.7.jar
new file mode 100644
index 0000000000..ac9687fc00
--- /dev/null
+++ b/third_party/scalacheck_2.8.0.RC3-1.7.jar
Binary files differ
diff --git a/third_party/scalatest-1.0/LICENSE b/third_party/scalatest-1.0/LICENSE
deleted file mode 100644
index d645695673..0000000000
--- a/third_party/scalatest-1.0/LICENSE
+++ /dev/null
@@ -1,202 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
diff --git a/third_party/scalatest-1.0/NOTICE b/third_party/scalatest-1.0/NOTICE
deleted file mode 100644
index a405cbd58a..0000000000
--- a/third_party/scalatest-1.0/NOTICE
+++ /dev/null
@@ -1,7 +0,0 @@
-================================================================================
-== NOTICE file corresponding to section 4(d) of the Apache License, ==
-== Version 2.0, in this case for the ScalaTest distribution. ==
-================================================================================
-
- - This product includes software developed by
- Artima, Inc. (http://www.artima.com/).
diff --git a/third_party/scalatest-1.0/README.txt b/third_party/scalatest-1.0/README.txt
deleted file mode 100644
index d505b9c640..0000000000
--- a/third_party/scalatest-1.0/README.txt
+++ /dev/null
@@ -1,58 +0,0 @@
-ScalaTest 1.0
-
-ScalaTest is a free, open-source testing toolkit for Scala and
-Java programmers. Because different developers take different approaches to creating
-software, no single approach to testing is a good fit for everyone. In light of
-this reality, ScalaTest is designed to facilitate different styles of testing. ScalaTest
-provides several traits that you can mix together into whatever combination makes you feel the most productive.
-For some examples of the various styles that ScalaTest supports, see:
-
-http://www.artima.com/scalatest
-
-GETTING STARTED
-
-To learn how to use ScalaTest, please
-open in your browser the scaladoc documentation in the
-/scalatest-1.0/doc directory. Look first at the documentation for trait
-org.scalatest.Suite, which gives a decent intro. All the other types are
-documented as well, so you can hop around to learn more.
-org.scalatest.tools.Runner explains how to use the application. The
-Ignore class is written in Java, and isn't currently shown in the Scaladoc.
-
-To try it out, you can use ScalaTest to run its own tests, i.e., the tests
-used to test ScalaTest itself. This command will run the GUI:
-
-scala -classpath scalatest-1.0.jar org.scalatest.tools.Runner -p "scalatest-1.0-tests.jar" -g -s org.scalatest.SuiteSuite
-
-This command will run and just print results to the standard output:
-
-scala -classpath scalatest-1.0.jar org.scalatest.tools.Runner -p "scalatest-1.0-tests.jar" -o -s org.scalatest.SuiteSuite
-
-ScalaTest 1.0 was tested with Scala version 2.7.5.final, so it is not
-guaranteed to work with earlier Scala versions.
-
-ABOUT SCALATEST
-
-ScalaTest was written by Bill Venners, George Berger, Josh Cough, and
-other contributors starting in late 2007. ScalaTest, which is almost
-exclusively written in Scala, follows and improves upon the Java code
-and design of Artima SuiteRunner, a testing tool also written
-primarily by Bill Venners, starting in 2001. Over the years a few
-other people contributed to SuiteRunner as well, including:
-
-Mark Brouwer
-Chua Chee Seng
-Chris Daily
-Matt Gerrans
-John Mitchel
-Frank Sommers
-
-Several people have helped with ScalaTest, including:
-
-Corey Haines
-Colin Howe
-Dianne Marsh
-Joel Neely
-Jon-Anders Teigen
-Daniel Watson
-
diff --git a/third_party/scalatest-1.0/scalatest-1.0-tests.jar b/third_party/scalatest-1.0/scalatest-1.0-tests.jar
deleted file mode 100644
index 8bdd3350e6..0000000000
--- a/third_party/scalatest-1.0/scalatest-1.0-tests.jar
+++ /dev/null
Binary files differ
diff --git a/third_party/scalatest-1.0/scalatest-1.0.jar b/third_party/scalatest-1.0/scalatest-1.0.jar
deleted file mode 100644
index ae7c1c7f17..0000000000
--- a/third_party/scalatest-1.0/scalatest-1.0.jar
+++ /dev/null
Binary files differ
diff --git a/third_party/scalatest-1.2-for-scala-2.8.0.RC3-SNAPSHOT.jar b/third_party/scalatest-1.2-for-scala-2.8.0.RC3-SNAPSHOT.jar
new file mode 100644
index 0000000000..f5c4af19b0
--- /dev/null
+++ b/third_party/scalatest-1.2-for-scala-2.8.0.RC3-SNAPSHOT.jar
Binary files differ