aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-07-30 13:29:13 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-07-30 13:29:13 -0700
commit5a26ca4a80a428eb4d7e3407ca496e39ad38c757 (patch)
tree6e14127fd95b2ea2fd7fd23b305bb086b253e3c3 /streaming
parentfcee4153b92bdd66dd92820a2670b339f9f59c77 (diff)
downloadspark-5a26ca4a80a428eb4d7e3407ca496e39ad38c757.tar.gz
spark-5a26ca4a80a428eb4d7e3407ca496e39ad38c757.tar.bz2
spark-5a26ca4a80a428eb4d7e3407ca496e39ad38c757.zip
Restructured file locations to separate examples and other programs from core programs.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala127
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager2.scala37
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/DumbTopKWordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/DumbTopKWordCount2_Special.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/DumbWordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/DumbWordCount2_Special.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepCount.scala (renamed from streaming/src/main/scala/spark/streaming/GrepCount.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepCount2.scala (renamed from streaming/src/main/scala/spark/streaming/GrepCount2.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepCountApprox.scala (renamed from streaming/src/main/scala/spark/streaming/GrepCountApprox.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/SimpleWordCount.scala (renamed from streaming/src/main/scala/spark/streaming/SimpleWordCount.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2.scala (renamed from streaming/src/main/scala/spark/streaming/SimpleWordCount2.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/SimpleWordCount2_Special.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopContentCount.scala (renamed from streaming/src/main/scala/spark/streaming/TopContentCount.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCount2.scala (renamed from streaming/src/main/scala/spark/streaming/TopKWordCount2.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/TopKWordCount2_Special.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount1.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount1.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount2.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount2_Special.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount3.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount3.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountEc2.scala (renamed from streaming/src/main/scala/spark/streaming/WordCountEc2.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountTrivialWindow.scala (renamed from streaming/src/main/scala/spark/streaming/WordCountTrivialWindow.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordMax.scala (renamed from streaming/src/main/scala/spark/streaming/WordMax.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/utils/SenGeneratorForPerformanceTest.scala (renamed from streaming/src/main/scala/spark/streaming/SenGeneratorForPerformanceTest.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/utils/SenderReceiverTest.scala (renamed from streaming/src/main/scala/spark/streaming/SenderReceiverTest.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/utils/SentenceFileGenerator.scala (renamed from streaming/src/main/scala/spark/streaming/SentenceFileGenerator.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/utils/SentenceGenerator.scala (renamed from streaming/src/main/scala/spark/streaming/SentenceGenerator.scala)0
-rw-r--r--streaming/src/main/scala/spark/streaming/utils/ShuffleTest.scala (renamed from streaming/src/main/scala/spark/streaming/ShuffleTest.scala)0
28 files changed, 27 insertions, 159 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index a985f44ba1..9a61d85274 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -42,26 +42,6 @@ case class Interval (val beginTime: Time, val endTime: Time) {
object Interval {
- /*
- implicit def longTupleToInterval (longTuple: (Long, Long)) =
- Interval(longTuple._1, longTuple._2)
-
- implicit def intTupleToInterval (intTuple: (Int, Int)) =
- Interval(intTuple._1, intTuple._2)
-
- implicit def string2Interval (str: String): Interval = {
- val parts = str.split(",")
- if (parts.length == 1)
- return Interval.zero
- return Interval (parts(0).toInt, parts(1).toInt)
- }
-
- def getInterval (timeMs: Long, intervalDurationMs: Long): Interval = {
- val intervalBeginMs = timeMs / intervalDurationMs * intervalDurationMs
- Interval(intervalBeginMs, intervalBeginMs + intervalDurationMs)
- }
- */
-
def zero() = new Interval (Time.zero, Time.zero)
def currentInterval(intervalDuration: LongTime): Interval = {
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 45a3971643..d7d88a7000 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -1,112 +1,37 @@
package spark.streaming
-import spark.SparkEnv
-import spark.Logging
-
-import scala.collection.mutable.PriorityQueue
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.remote._
-import scala.actors.remote.RemoteActor._
-import scala.actors.scheduler.ResizableThreadPoolScheduler
-import scala.actors.scheduler.ForkJoinScheduler
-
-sealed trait JobManagerMessage
-case class RunJob(job: Job) extends JobManagerMessage
-case class JobCompleted(handlerId: Int) extends JobManagerMessage
-
-class JobHandler(ssc: SparkStreamContext, val id: Int) extends DaemonActor with Logging {
-
- var busy = false
-
- def act() {
- loop {
- receive {
- case job: Job => {
- SparkEnv.set(ssc.env)
- try {
- logInfo("Starting " + job)
- job.run()
- logInfo("Finished " + job)
- if (job.time.isInstanceOf[LongTime]) {
- val longTime = job.time.asInstanceOf[LongTime]
- logInfo("Total pushing + skew + processing delay for " + longTime + " is " +
- (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
- }
- } catch {
- case e: Exception => logError("SparkStream job failed", e)
+import spark.{Logging, SparkEnv}
+import java.util.concurrent.Executors
+
+
+class JobManager(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging {
+
+ class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable {
+ def run() {
+ SparkEnv.set(ssc.env)
+ try {
+ logInfo("Starting " + job)
+ job.run()
+ logInfo("Finished " + job)
+ if (job.time.isInstanceOf[LongTime]) {
+ val longTime = job.time.asInstanceOf[LongTime]
+ logInfo("Total notification + skew + processing delay for " + longTime + " is " +
+ (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
+ if (System.getProperty("spark.stream.distributed", "false") == "true") {
+ TestInputBlockTracker.setEndTime(job.time)
}
- busy = false
- reply(JobCompleted(id))
}
+ } catch {
+ case e: Exception => logError("SparkStream job failed", e)
}
}
}
-}
-class JobManager(ssc: SparkStreamContext, numThreads: Int = 2) extends DaemonActor with Logging {
+ initLogging()
- implicit private val jobOrdering = new Ordering[Job] {
- override def compare(job1: Job, job2: Job): Int = {
- if (job1.time < job2.time) {
- return 1
- } else if (job2.time < job1.time) {
- return -1
- } else {
- return 0
- }
- }
- }
-
- private val jobs = new PriorityQueue[Job]()
- private val handlers = (0 until numThreads).map(i => new JobHandler(ssc, i))
-
- def act() {
- handlers.foreach(_.start)
- loop {
- receive {
- case RunJob(job) => {
- jobs += job
- logInfo("Job " + job + " submitted")
- runJob()
- }
- case JobCompleted(handlerId) => {
- runJob()
- }
- }
- }
- }
-
- def runJob(): Unit = {
- logInfo("Attempting to allocate job ")
- if (jobs.size > 0) {
- handlers.find(!_.busy).foreach(handler => {
- val job = jobs.dequeue
- logInfo("Allocating job " + job + " to handler " + handler.id)
- handler.busy = true
- handler ! job
- })
- }
+ val jobExecutor = Executors.newFixedThreadPool(numThreads)
+
+ def runJob(job: Job) {
+ jobExecutor.execute(new JobHandler(ssc, job))
}
}
-
-object JobManager {
- def main(args: Array[String]) {
- val ssc = new SparkStreamContext("local[4]", "JobManagerTest")
- val jobManager = new JobManager(ssc)
- jobManager.start()
-
- val t = System.currentTimeMillis
- for (i <- 1 to 10) {
- jobManager ! RunJob(new Job(
- LongTime(i),
- () => {
- Thread.sleep(500)
- println("Job " + i + " took " + (System.currentTimeMillis - t) + " ms")
- }
- ))
- }
- Thread.sleep(6000)
- }
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/JobManager2.scala b/streaming/src/main/scala/spark/streaming/JobManager2.scala
deleted file mode 100644
index ce0154e19b..0000000000
--- a/streaming/src/main/scala/spark/streaming/JobManager2.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-package spark.streaming
-
-import spark.{Logging, SparkEnv}
-import java.util.concurrent.Executors
-
-
-class JobManager2(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging {
-
- class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable {
- def run() {
- SparkEnv.set(ssc.env)
- try {
- logInfo("Starting " + job)
- job.run()
- logInfo("Finished " + job)
- if (job.time.isInstanceOf[LongTime]) {
- val longTime = job.time.asInstanceOf[LongTime]
- logInfo("Total notification + skew + processing delay for " + longTime + " is " +
- (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
- if (System.getProperty("spark.stream.distributed", "false") == "true") {
- TestInputBlockTracker.setEndTime(job.time)
- }
- }
- } catch {
- case e: Exception => logError("SparkStream job failed", e)
- }
- }
- }
-
- initLogging()
-
- val jobExecutor = Executors.newFixedThreadPool(numThreads)
-
- def runJob(job: Job) {
- jobExecutor.execute(new JobHandler(ssc, job))
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 4137d8f27d..8df346559c 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -46,7 +46,7 @@ extends Actor with Logging {
val inputNames = inputRDSs.map(_.inputName).toArray
val inputStates = new HashMap[Interval, InputState]()
val currentJobs = System.getProperty("spark.stream.currentJobs", "1").toInt
- val jobManager = new JobManager2(ssc, currentJobs)
+ val jobManager = new JobManager(ssc, currentJobs)
// TODO(Haoyuan): The following line is for performance test only.
var cnt: Int = System.getProperty("spark.stream.fake.cnt", "60").toInt
diff --git a/streaming/src/main/scala/spark/streaming/DumbTopKWordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/DumbTopKWordCount2_Special.scala
index 2ca72da79f..2ca72da79f 100644
--- a/streaming/src/main/scala/spark/streaming/DumbTopKWordCount2_Special.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/DumbTopKWordCount2_Special.scala
diff --git a/streaming/src/main/scala/spark/streaming/DumbWordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/DumbWordCount2_Special.scala
index 34e7edfda9..34e7edfda9 100644
--- a/streaming/src/main/scala/spark/streaming/DumbWordCount2_Special.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/DumbWordCount2_Special.scala
diff --git a/streaming/src/main/scala/spark/streaming/GrepCount.scala b/streaming/src/main/scala/spark/streaming/examples/GrepCount.scala
index ec3e70f258..ec3e70f258 100644
--- a/streaming/src/main/scala/spark/streaming/GrepCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepCount.scala
diff --git a/streaming/src/main/scala/spark/streaming/GrepCount2.scala b/streaming/src/main/scala/spark/streaming/examples/GrepCount2.scala
index 27ecced1c0..27ecced1c0 100644
--- a/streaming/src/main/scala/spark/streaming/GrepCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepCount2.scala
diff --git a/streaming/src/main/scala/spark/streaming/GrepCountApprox.scala b/streaming/src/main/scala/spark/streaming/examples/GrepCountApprox.scala
index f9674136fe..f9674136fe 100644
--- a/streaming/src/main/scala/spark/streaming/GrepCountApprox.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepCountApprox.scala
diff --git a/streaming/src/main/scala/spark/streaming/SimpleWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount.scala
index a75ccd3a56..a75ccd3a56 100644
--- a/streaming/src/main/scala/spark/streaming/SimpleWordCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount.scala
diff --git a/streaming/src/main/scala/spark/streaming/SimpleWordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2.scala
index 9672e64b13..9672e64b13 100644
--- a/streaming/src/main/scala/spark/streaming/SimpleWordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2.scala
diff --git a/streaming/src/main/scala/spark/streaming/SimpleWordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2_Special.scala
index 503033a8e5..503033a8e5 100644
--- a/streaming/src/main/scala/spark/streaming/SimpleWordCount2_Special.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2_Special.scala
diff --git a/streaming/src/main/scala/spark/streaming/TopContentCount.scala b/streaming/src/main/scala/spark/streaming/examples/TopContentCount.scala
index 031e989c87..031e989c87 100644
--- a/streaming/src/main/scala/spark/streaming/TopContentCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopContentCount.scala
diff --git a/streaming/src/main/scala/spark/streaming/TopKWordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCount2.scala
index 679ed0a7ef..679ed0a7ef 100644
--- a/streaming/src/main/scala/spark/streaming/TopKWordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCount2.scala
diff --git a/streaming/src/main/scala/spark/streaming/TopKWordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCount2_Special.scala
index c873fbd0f0..c873fbd0f0 100644
--- a/streaming/src/main/scala/spark/streaming/TopKWordCount2_Special.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCount2_Special.scala
diff --git a/streaming/src/main/scala/spark/streaming/WordCount.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala
index fb5508ffcc..fb5508ffcc 100644
--- a/streaming/src/main/scala/spark/streaming/WordCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala
diff --git a/streaming/src/main/scala/spark/streaming/WordCount1.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount1.scala
index 42d985920a..42d985920a 100644
--- a/streaming/src/main/scala/spark/streaming/WordCount1.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount1.scala
diff --git a/streaming/src/main/scala/spark/streaming/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index 9168a2fe2f..9168a2fe2f 100644
--- a/streaming/src/main/scala/spark/streaming/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
diff --git a/streaming/src/main/scala/spark/streaming/WordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2_Special.scala
index 1920915af7..1920915af7 100644
--- a/streaming/src/main/scala/spark/streaming/WordCount2_Special.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2_Special.scala
diff --git a/streaming/src/main/scala/spark/streaming/WordCount3.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount3.scala
index 018c19a509..018c19a509 100644
--- a/streaming/src/main/scala/spark/streaming/WordCount3.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount3.scala
diff --git a/streaming/src/main/scala/spark/streaming/WordCountEc2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountEc2.scala
index 82b9fa781d..82b9fa781d 100644
--- a/streaming/src/main/scala/spark/streaming/WordCountEc2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountEc2.scala
diff --git a/streaming/src/main/scala/spark/streaming/WordCountTrivialWindow.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountTrivialWindow.scala
index 114dd144f1..114dd144f1 100644
--- a/streaming/src/main/scala/spark/streaming/WordCountTrivialWindow.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountTrivialWindow.scala
diff --git a/streaming/src/main/scala/spark/streaming/WordMax.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax.scala
index fbfc48030f..fbfc48030f 100644
--- a/streaming/src/main/scala/spark/streaming/WordMax.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordMax.scala
diff --git a/streaming/src/main/scala/spark/streaming/SenGeneratorForPerformanceTest.scala b/streaming/src/main/scala/spark/streaming/utils/SenGeneratorForPerformanceTest.scala
index bb32089ae2..bb32089ae2 100644
--- a/streaming/src/main/scala/spark/streaming/SenGeneratorForPerformanceTest.scala
+++ b/streaming/src/main/scala/spark/streaming/utils/SenGeneratorForPerformanceTest.scala
diff --git a/streaming/src/main/scala/spark/streaming/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/utils/SenderReceiverTest.scala
index 6af270298a..6af270298a 100644
--- a/streaming/src/main/scala/spark/streaming/SenderReceiverTest.scala
+++ b/streaming/src/main/scala/spark/streaming/utils/SenderReceiverTest.scala
diff --git a/streaming/src/main/scala/spark/streaming/SentenceFileGenerator.scala b/streaming/src/main/scala/spark/streaming/utils/SentenceFileGenerator.scala
index 15858f59e3..15858f59e3 100644
--- a/streaming/src/main/scala/spark/streaming/SentenceFileGenerator.scala
+++ b/streaming/src/main/scala/spark/streaming/utils/SentenceFileGenerator.scala
diff --git a/streaming/src/main/scala/spark/streaming/SentenceGenerator.scala b/streaming/src/main/scala/spark/streaming/utils/SentenceGenerator.scala
index a9f124d2d7..a9f124d2d7 100644
--- a/streaming/src/main/scala/spark/streaming/SentenceGenerator.scala
+++ b/streaming/src/main/scala/spark/streaming/utils/SentenceGenerator.scala
diff --git a/streaming/src/main/scala/spark/streaming/ShuffleTest.scala b/streaming/src/main/scala/spark/streaming/utils/ShuffleTest.scala
index 32aa4144a0..32aa4144a0 100644
--- a/streaming/src/main/scala/spark/streaming/ShuffleTest.scala
+++ b/streaming/src/main/scala/spark/streaming/utils/ShuffleTest.scala