aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-27 21:11:41 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-27 21:11:41 -0700
commit0567646180881a4738dbb06f3e614d8082ec46b5 (patch)
treefc456cad43578547b05c8ab8f47585a934dbf5d4 /core
parent2742de707a7abfd76e3de20e10a0e4a974f12fd5 (diff)
downloadspark-0567646180881a4738dbb06f3e614d8082ec46b5.tar.gz
spark-0567646180881a4738dbb06f3e614d8082ec46b5.tar.bz2
spark-0567646180881a4738dbb06f3e614d8082ec46b5.zip
Shuffle is also working from its own subpackage.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Executor.scala1
-rw-r--r--core/src/main/scala/spark/RDD.scala3
-rw-r--r--core/src/main/scala/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala2
-rw-r--r--core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala (renamed from core/src/main/scala/spark/BasicLocalFileShuffle.scala)4
-rw-r--r--core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala (renamed from core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala)7
-rw-r--r--core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala (renamed from core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala)7
-rw-r--r--core/src/main/scala/spark/shuffle/DfsShuffle.scala (renamed from core/src/main/scala/spark/DfsShuffle.scala)4
-rw-r--r--core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala (renamed from core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala)7
-rw-r--r--core/src/main/scala/spark/shuffle/Shuffle.scala (renamed from core/src/main/scala/spark/Shuffle.scala)6
10 files changed, 30 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index 7e3911aef1..577e9371bf 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -7,6 +7,7 @@ import java.util.concurrent.{Executors, ExecutorService}
import scala.collection.mutable.ArrayBuffer
import spark.broadcast._
+import spark.shuffle._
import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver}
import mesos.{TaskDescription, TaskState, TaskStatus}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 39f2dc4458..30929ed089 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -12,6 +12,7 @@ import SparkContext._
import mesos._
+import spark.shuffle._
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
@@ -359,7 +360,7 @@ extends RDD[Pair[T, U]](sc) {
: RDD[(K, C)] =
{
val shufClass = Class.forName(System.getProperty(
- "spark.shuffle.class", "spark.LocalFileShuffle"))
+ "spark.shuffle.class", "spark.BasicLocalFileShuffle"))
val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]]
shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners)
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0ffe44f91d..082e9bbe4b 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -5,6 +5,7 @@ import java.io._
import scala.collection.mutable.ArrayBuffer
import spark.broadcast._
+import spark.shuffle._
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.SequenceFileInputFormat
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index efc3c50b13..e9c7639f12 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -35,7 +35,7 @@ extends Logging {
def initialize (isMaster__ : Boolean): Unit = synchronized {
if (!initialized) {
val broadcastFactoryClass = System.getProperty("spark.broadcast.factory",
- "spark.DfsBroadcastFactory")
+ "spark.broadcast.DfsBroadcastFactory")
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
diff --git a/core/src/main/scala/spark/BasicLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala
index 3c3f132083..5e2c11e116 100644
--- a/core/src/main/scala/spark/BasicLocalFileShuffle.scala
+++ b/core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.shuffle
import java.io._
import java.net.URL
@@ -7,6 +7,8 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{ArrayBuffer, HashMap}
+import spark._
+
/**
* A basic implementation of shuffle using local files served through HTTP.
*
diff --git a/core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala
index 98af7c8d65..8d513b3d86 100644
--- a/core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala
+++ b/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.shuffle
import java.io._
import java.net._
@@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.math
+
+import spark._
/**
* An implementation of shuffle using local files served through custom server
@@ -153,7 +156,7 @@ extends Shuffle[K, V, C] with Logging {
while (hasSplits < totalSplits) {
var numThreadsToCreate =
- Math.min(totalSplits, Shuffle.MaxRxConnections) -
+ math.min(totalSplits, Shuffle.MaxRxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
diff --git a/core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala
index 87e824fb2e..f210249d39 100644
--- a/core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala
+++ b/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.shuffle
import java.io._
import java.net._
@@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.math
+
+import spark._
/**
* An implementation of shuffle using local files served through custom server
@@ -100,7 +103,7 @@ extends Shuffle[K, V, C] with Logging {
Shuffle.MaxRxConnections)
while (hasSplits < totalSplits) {
- var numThreadsToCreate = Math.min(totalSplits,
+ var numThreadsToCreate = math.min(totalSplits,
Shuffle.MaxRxConnections) -
threadPool.getActiveCount
diff --git a/core/src/main/scala/spark/DfsShuffle.scala b/core/src/main/scala/spark/shuffle/DfsShuffle.scala
index bf91be7d2c..2f079a453d 100644
--- a/core/src/main/scala/spark/DfsShuffle.scala
+++ b/core/src/main/scala/spark/shuffle/DfsShuffle.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.shuffle
import java.io.{EOFException, ObjectInputStream, ObjectOutputStream}
import java.net.URI
@@ -9,6 +9,8 @@ import scala.collection.mutable.HashMap
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
+import spark._
+
/**
* A simple implementation of shuffle using a distributed file system.
*
diff --git a/core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala
index 8e7b897668..174cfd993f 100644
--- a/core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala
+++ b/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.shuffle
import java.io._
import java.net._
@@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.math
+
+import spark._
/**
* An implementation of shuffle using local files served through HTTP where
@@ -98,7 +101,7 @@ extends Shuffle[K, V, C] with Logging {
while (hasSplits < totalSplits) {
var numThreadsToCreate =
- Math.min(totalSplits, Shuffle.MaxRxConnections) -
+ math.min(totalSplits, Shuffle.MaxRxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
diff --git a/core/src/main/scala/spark/Shuffle.scala b/core/src/main/scala/spark/shuffle/Shuffle.scala
index f2d790f727..55d7d57b83 100644
--- a/core/src/main/scala/spark/Shuffle.scala
+++ b/core/src/main/scala/spark/shuffle/Shuffle.scala
@@ -1,9 +1,11 @@
-package spark
+package spark.shuffle
import java.net._
import java.util.{BitSet}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
+import spark._
+
/**
* A trait for shuffle system. Given an input RDD and combiner functions
* for PairRDDExtras.combineByKey(), returns an output RDD.
@@ -21,7 +23,7 @@ trait Shuffle[K, V, C] {
/**
* An object containing common shuffle config parameters
*/
-private object Shuffle
+object Shuffle
extends Logging {
// Tracker communication constants
val ReducerEntering = 0