aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-27 00:53:57 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-27 00:53:57 -0800
commite4b8db45aef934929dbab443156375aebb1ea45e (patch)
treea50903a8eb7bfac84674548bd4bfb99357f7942b
parentc25914228c542ed2900310b82926d10b95df83d6 (diff)
downloadspark-e4b8db45aef934929dbab443156375aebb1ea45e.tar.gz
spark-e4b8db45aef934929dbab443156375aebb1ea45e.tar.bz2
spark-e4b8db45aef934929dbab443156375aebb1ea45e.zip
- Moved DaemonThreadPool factory methods to the Broadcast Object.
- Reflection-based broadcast class loading is still not working.
-rw-r--r--src/scala/spark/Broadcast.scala32
-rw-r--r--src/scala/spark/ChainedBroadcast.scala40
-rw-r--r--src/scala/spark/SparkContext.scala14
3 files changed, 49 insertions, 37 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 05f214e48e..afff500bb0 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -1,6 +1,7 @@
package spark
import java.util.UUID
+import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
@serializable
trait Broadcast {
@@ -31,6 +32,37 @@ extends Logging {
}
}
}
+
+ // Returns a standard ThreadFactory except all threads are daemons
+ private def newDaemonThreadFactory: ThreadFactory = {
+ new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ var t = Executors.defaultThreadFactory.newThread (r)
+ t.setDaemon (true)
+ return t
+ }
+ }
+ }
+
+ // Wrapper over newCachedThreadPool
+ def newDaemonCachedThreadPool: ThreadPoolExecutor = {
+ var threadPool =
+ Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
+
+ threadPool.setThreadFactory (newDaemonThreadFactory)
+
+ return threadPool
+ }
+
+ // Wrapper over newFixedThreadPool
+ def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
+ var threadPool =
+ Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
+
+ threadPool.setThreadFactory (newDaemonThreadFactory)
+
+ return threadPool
+ }
}
@serializable
diff --git a/src/scala/spark/ChainedBroadcast.scala b/src/scala/spark/ChainedBroadcast.scala
index 6b34843abe..32f97ce442 100644
--- a/src/scala/spark/ChainedBroadcast.scala
+++ b/src/scala/spark/ChainedBroadcast.scala
@@ -6,8 +6,6 @@ import java.util.{Comparator, PriorityQueue, Random, UUID}
import com.google.common.collect.MapMaker
-import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
-
import scala.collection.mutable.{Map, Set}
@serializable
@@ -397,7 +395,7 @@ extends Broadcast with Logging {
private var setOfCompletedSources = Set[SourceInfo] ()
override def run: Unit = {
- var threadPool = ChainedBroadcast.newDaemonCachedThreadPool
+ var threadPool = Broadcast.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0)
@@ -602,7 +600,7 @@ extends Broadcast with Logging {
class ServeMultipleRequests
extends Thread with Logging {
override def run: Unit = {
- var threadPool = ChainedBroadcast.newDaemonCachedThreadPool
+ var threadPool = Broadcast.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0)
@@ -671,7 +669,6 @@ extends Broadcast with Logging {
sendObject
}
} catch {
- // TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
case e: Exception => {
@@ -802,41 +799,10 @@ extends Logging {
}
}
- // Returns a standard ThreadFactory except all threads are daemons
- private def newDaemonThreadFactory: ThreadFactory = {
- new ThreadFactory {
- def newThread(r: Runnable): Thread = {
- var t = Executors.defaultThreadFactory.newThread (r)
- t.setDaemon (true)
- return t
- }
- }
- }
-
- // Wrapper over newCachedThreadPool
- def newDaemonCachedThreadPool: ThreadPoolExecutor = {
- var threadPool =
- Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
-
- threadPool.setThreadFactory (newDaemonThreadFactory)
-
- return threadPool
- }
-
- // Wrapper over newFixedThreadPool
- def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
- var threadPool =
- Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
-
- threadPool.setThreadFactory (newDaemonThreadFactory)
-
- return threadPool
- }
-
class TrackMultipleValues
extends Thread with Logging {
override def run: Unit = {
- var threadPool = ChainedBroadcast.newDaemonCachedThreadPool
+ var threadPool = Broadcast.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (ChainedBroadcast.MasterTrackerPort)
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index 98149dc1b7..ef328d821a 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -22,6 +22,20 @@ class SparkContext(master: String, frameworkName: String) extends Logging {
// def broadcast[T](value: T) = new DfsBroadcast(value, local)
def broadcast[T](value: T) = new ChainedBroadcast(value, local)
+// def broadcast[T](value: T) = {
+// val broadcastClass = System.getProperty("spark.broadcast.Class",
+// "spark.ChainedBroadcast")
+// val booleanArgs = Array[AnyRef] (local.asInstanceOf[AnyRef])
+// Class.forName(broadcastClass).getConstructors()(0).newInstance(booleanArgs:_*).asInstanceOf[Class.forName(broadcastClass)]
+// }
+
+// def initialize() {
+// val cacheClass = System.getProperty("spark.cache.class",
+// "spark.SoftReferenceCache")
+// instance = Class.forName(cacheClass).newInstance().asInstanceOf[Cache]
+// }
+
+
def textFile(path: String) = new HdfsTextFile(this, path)
val LOCAL_REGEX = """local\[([0-9]+)\]""".r