diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-07-30 13:29:13 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-07-30 13:29:13 -0700 |
commit | 5a26ca4a80a428eb4d7e3407ca496e39ad38c757 (patch) | |
tree | 6e14127fd95b2ea2fd7fd23b305bb086b253e3c3 /streaming | |
parent | fcee4153b92bdd66dd92820a2670b339f9f59c77 (diff) | |
download | spark-5a26ca4a80a428eb4d7e3407ca496e39ad38c757.tar.gz spark-5a26ca4a80a428eb4d7e3407ca496e39ad38c757.tar.bz2 spark-5a26ca4a80a428eb4d7e3407ca496e39ad38c757.zip |
Restructured file locations to separate examples and other programs from core programs.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/Interval.scala | 20 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/JobManager.scala | 127 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/JobManager2.scala | 37 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/Scheduler.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/DumbTopKWordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/DumbTopKWordCount2_Special.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/DumbWordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/DumbWordCount2_Special.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/GrepCount.scala (renamed from streaming/src/main/scala/spark/streaming/GrepCount.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/GrepCount2.scala (renamed from streaming/src/main/scala/spark/streaming/GrepCount2.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/GrepCountApprox.scala (renamed from streaming/src/main/scala/spark/streaming/GrepCountApprox.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/SimpleWordCount.scala (renamed from streaming/src/main/scala/spark/streaming/SimpleWordCount.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2.scala (renamed from streaming/src/main/scala/spark/streaming/SimpleWordCount2.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/SimpleWordCount2_Special.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/TopContentCount.scala (renamed from streaming/src/main/scala/spark/streaming/TopContentCount.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/TopKWordCount2.scala (renamed from streaming/src/main/scala/spark/streaming/TopKWordCount2.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/TopKWordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/TopKWordCount2_Special.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCount.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCount1.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount1.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCount2.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount2.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCount2_Special.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount2_Special.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCount3.scala (renamed from streaming/src/main/scala/spark/streaming/WordCount3.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCountEc2.scala (renamed from streaming/src/main/scala/spark/streaming/WordCountEc2.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCountTrivialWindow.scala (renamed from streaming/src/main/scala/spark/streaming/WordCountTrivialWindow.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordMax.scala (renamed from streaming/src/main/scala/spark/streaming/WordMax.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/utils/SenGeneratorForPerformanceTest.scala (renamed from streaming/src/main/scala/spark/streaming/SenGeneratorForPerformanceTest.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/utils/SenderReceiverTest.scala (renamed from streaming/src/main/scala/spark/streaming/SenderReceiverTest.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/utils/SentenceFileGenerator.scala (renamed from streaming/src/main/scala/spark/streaming/SentenceFileGenerator.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/utils/SentenceGenerator.scala (renamed from streaming/src/main/scala/spark/streaming/SentenceGenerator.scala) | 0 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/utils/ShuffleTest.scala (renamed from streaming/src/main/scala/spark/streaming/ShuffleTest.scala) | 0 |
28 files changed, 27 insertions, 159 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala index a985f44ba1..9a61d85274 100644 --- a/streaming/src/main/scala/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -42,26 +42,6 @@ case class Interval (val beginTime: Time, val endTime: Time) { object Interval { - /* - implicit def longTupleToInterval (longTuple: (Long, Long)) = - Interval(longTuple._1, longTuple._2) - - implicit def intTupleToInterval (intTuple: (Int, Int)) = - Interval(intTuple._1, intTuple._2) - - implicit def string2Interval (str: String): Interval = { - val parts = str.split(",") - if (parts.length == 1) - return Interval.zero - return Interval (parts(0).toInt, parts(1).toInt) - } - - def getInterval (timeMs: Long, intervalDurationMs: Long): Interval = { - val intervalBeginMs = timeMs / intervalDurationMs * intervalDurationMs - Interval(intervalBeginMs, intervalBeginMs + intervalDurationMs) - } - */ - def zero() = new Interval (Time.zero, Time.zero) def currentInterval(intervalDuration: LongTime): Interval = { diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 45a3971643..d7d88a7000 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -1,112 +1,37 @@ package spark.streaming -import spark.SparkEnv -import spark.Logging - -import scala.collection.mutable.PriorityQueue -import scala.actors._ -import scala.actors.Actor._ -import scala.actors.remote._ -import scala.actors.remote.RemoteActor._ -import scala.actors.scheduler.ResizableThreadPoolScheduler -import scala.actors.scheduler.ForkJoinScheduler - -sealed trait JobManagerMessage -case class RunJob(job: Job) extends JobManagerMessage -case class JobCompleted(handlerId: Int) extends JobManagerMessage - -class JobHandler(ssc: SparkStreamContext, val id: Int) extends DaemonActor with Logging { - - var busy = false - - def act() { - loop { - receive { - case job: Job => { - SparkEnv.set(ssc.env) - try { - logInfo("Starting " + job) - job.run() - logInfo("Finished " + job) - if (job.time.isInstanceOf[LongTime]) { - val longTime = job.time.asInstanceOf[LongTime] - logInfo("Total pushing + skew + processing delay for " + longTime + " is " + - (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s") - } - } catch { - case e: Exception => logError("SparkStream job failed", e) +import spark.{Logging, SparkEnv} +import java.util.concurrent.Executors + + +class JobManager(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging { + + class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable { + def run() { + SparkEnv.set(ssc.env) + try { + logInfo("Starting " + job) + job.run() + logInfo("Finished " + job) + if (job.time.isInstanceOf[LongTime]) { + val longTime = job.time.asInstanceOf[LongTime] + logInfo("Total notification + skew + processing delay for " + longTime + " is " + + (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s") + if (System.getProperty("spark.stream.distributed", "false") == "true") { + TestInputBlockTracker.setEndTime(job.time) } - busy = false - reply(JobCompleted(id)) } + } catch { + case e: Exception => logError("SparkStream job failed", e) } } } -} -class JobManager(ssc: SparkStreamContext, numThreads: Int = 2) extends DaemonActor with Logging { + initLogging() - implicit private val jobOrdering = new Ordering[Job] { - override def compare(job1: Job, job2: Job): Int = { - if (job1.time < job2.time) { - return 1 - } else if (job2.time < job1.time) { - return -1 - } else { - return 0 - } - } - } - - private val jobs = new PriorityQueue[Job]() - private val handlers = (0 until numThreads).map(i => new JobHandler(ssc, i)) - - def act() { - handlers.foreach(_.start) - loop { - receive { - case RunJob(job) => { - jobs += job - logInfo("Job " + job + " submitted") - runJob() - } - case JobCompleted(handlerId) => { - runJob() - } - } - } - } - - def runJob(): Unit = { - logInfo("Attempting to allocate job ") - if (jobs.size > 0) { - handlers.find(!_.busy).foreach(handler => { - val job = jobs.dequeue - logInfo("Allocating job " + job + " to handler " + handler.id) - handler.busy = true - handler ! job - }) - } + val jobExecutor = Executors.newFixedThreadPool(numThreads) + + def runJob(job: Job) { + jobExecutor.execute(new JobHandler(ssc, job)) } } - -object JobManager { - def main(args: Array[String]) { - val ssc = new SparkStreamContext("local[4]", "JobManagerTest") - val jobManager = new JobManager(ssc) - jobManager.start() - - val t = System.currentTimeMillis - for (i <- 1 to 10) { - jobManager ! RunJob(new Job( - LongTime(i), - () => { - Thread.sleep(500) - println("Job " + i + " took " + (System.currentTimeMillis - t) + " ms") - } - )) - } - Thread.sleep(6000) - } -} - diff --git a/streaming/src/main/scala/spark/streaming/JobManager2.scala b/streaming/src/main/scala/spark/streaming/JobManager2.scala deleted file mode 100644 index ce0154e19b..0000000000 --- a/streaming/src/main/scala/spark/streaming/JobManager2.scala +++ /dev/null @@ -1,37 +0,0 @@ -package spark.streaming - -import spark.{Logging, SparkEnv} -import java.util.concurrent.Executors - - -class JobManager2(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging { - - class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable { - def run() { - SparkEnv.set(ssc.env) - try { - logInfo("Starting " + job) - job.run() - logInfo("Finished " + job) - if (job.time.isInstanceOf[LongTime]) { - val longTime = job.time.asInstanceOf[LongTime] - logInfo("Total notification + skew + processing delay for " + longTime + " is " + - (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s") - if (System.getProperty("spark.stream.distributed", "false") == "true") { - TestInputBlockTracker.setEndTime(job.time) - } - } - } catch { - case e: Exception => logError("SparkStream job failed", e) - } - } - } - - initLogging() - - val jobExecutor = Executors.newFixedThreadPool(numThreads) - - def runJob(job: Job) { - jobExecutor.execute(new JobHandler(ssc, job)) - } -} diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 4137d8f27d..8df346559c 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -46,7 +46,7 @@ extends Actor with Logging { val inputNames = inputRDSs.map(_.inputName).toArray val inputStates = new HashMap[Interval, InputState]() val currentJobs = System.getProperty("spark.stream.currentJobs", "1").toInt - val jobManager = new JobManager2(ssc, currentJobs) + val jobManager = new JobManager(ssc, currentJobs) // TODO(Haoyuan): The following line is for performance test only. var cnt: Int = System.getProperty("spark.stream.fake.cnt", "60").toInt diff --git a/streaming/src/main/scala/spark/streaming/DumbTopKWordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/DumbTopKWordCount2_Special.scala index 2ca72da79f..2ca72da79f 100644 --- a/streaming/src/main/scala/spark/streaming/DumbTopKWordCount2_Special.scala +++ b/streaming/src/main/scala/spark/streaming/examples/DumbTopKWordCount2_Special.scala diff --git a/streaming/src/main/scala/spark/streaming/DumbWordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/DumbWordCount2_Special.scala index 34e7edfda9..34e7edfda9 100644 --- a/streaming/src/main/scala/spark/streaming/DumbWordCount2_Special.scala +++ b/streaming/src/main/scala/spark/streaming/examples/DumbWordCount2_Special.scala diff --git a/streaming/src/main/scala/spark/streaming/GrepCount.scala b/streaming/src/main/scala/spark/streaming/examples/GrepCount.scala index ec3e70f258..ec3e70f258 100644 --- a/streaming/src/main/scala/spark/streaming/GrepCount.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepCount.scala diff --git a/streaming/src/main/scala/spark/streaming/GrepCount2.scala b/streaming/src/main/scala/spark/streaming/examples/GrepCount2.scala index 27ecced1c0..27ecced1c0 100644 --- a/streaming/src/main/scala/spark/streaming/GrepCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepCount2.scala diff --git a/streaming/src/main/scala/spark/streaming/GrepCountApprox.scala b/streaming/src/main/scala/spark/streaming/examples/GrepCountApprox.scala index f9674136fe..f9674136fe 100644 --- a/streaming/src/main/scala/spark/streaming/GrepCountApprox.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepCountApprox.scala diff --git a/streaming/src/main/scala/spark/streaming/SimpleWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount.scala index a75ccd3a56..a75ccd3a56 100644 --- a/streaming/src/main/scala/spark/streaming/SimpleWordCount.scala +++ b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount.scala diff --git a/streaming/src/main/scala/spark/streaming/SimpleWordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2.scala index 9672e64b13..9672e64b13 100644 --- a/streaming/src/main/scala/spark/streaming/SimpleWordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2.scala diff --git a/streaming/src/main/scala/spark/streaming/SimpleWordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2_Special.scala index 503033a8e5..503033a8e5 100644 --- a/streaming/src/main/scala/spark/streaming/SimpleWordCount2_Special.scala +++ b/streaming/src/main/scala/spark/streaming/examples/SimpleWordCount2_Special.scala diff --git a/streaming/src/main/scala/spark/streaming/TopContentCount.scala b/streaming/src/main/scala/spark/streaming/examples/TopContentCount.scala index 031e989c87..031e989c87 100644 --- a/streaming/src/main/scala/spark/streaming/TopContentCount.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopContentCount.scala diff --git a/streaming/src/main/scala/spark/streaming/TopKWordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCount2.scala index 679ed0a7ef..679ed0a7ef 100644 --- a/streaming/src/main/scala/spark/streaming/TopKWordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCount2.scala diff --git a/streaming/src/main/scala/spark/streaming/TopKWordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCount2_Special.scala index c873fbd0f0..c873fbd0f0 100644 --- a/streaming/src/main/scala/spark/streaming/TopKWordCount2_Special.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCount2_Special.scala diff --git a/streaming/src/main/scala/spark/streaming/WordCount.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala index fb5508ffcc..fb5508ffcc 100644 --- a/streaming/src/main/scala/spark/streaming/WordCount.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala diff --git a/streaming/src/main/scala/spark/streaming/WordCount1.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount1.scala index 42d985920a..42d985920a 100644 --- a/streaming/src/main/scala/spark/streaming/WordCount1.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount1.scala diff --git a/streaming/src/main/scala/spark/streaming/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index 9168a2fe2f..9168a2fe2f 100644 --- a/streaming/src/main/scala/spark/streaming/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala diff --git a/streaming/src/main/scala/spark/streaming/WordCount2_Special.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2_Special.scala index 1920915af7..1920915af7 100644 --- a/streaming/src/main/scala/spark/streaming/WordCount2_Special.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2_Special.scala diff --git a/streaming/src/main/scala/spark/streaming/WordCount3.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount3.scala index 018c19a509..018c19a509 100644 --- a/streaming/src/main/scala/spark/streaming/WordCount3.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount3.scala diff --git a/streaming/src/main/scala/spark/streaming/WordCountEc2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountEc2.scala index 82b9fa781d..82b9fa781d 100644 --- a/streaming/src/main/scala/spark/streaming/WordCountEc2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountEc2.scala diff --git a/streaming/src/main/scala/spark/streaming/WordCountTrivialWindow.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountTrivialWindow.scala index 114dd144f1..114dd144f1 100644 --- a/streaming/src/main/scala/spark/streaming/WordCountTrivialWindow.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountTrivialWindow.scala diff --git a/streaming/src/main/scala/spark/streaming/WordMax.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax.scala index fbfc48030f..fbfc48030f 100644 --- a/streaming/src/main/scala/spark/streaming/WordMax.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordMax.scala diff --git a/streaming/src/main/scala/spark/streaming/SenGeneratorForPerformanceTest.scala b/streaming/src/main/scala/spark/streaming/utils/SenGeneratorForPerformanceTest.scala index bb32089ae2..bb32089ae2 100644 --- a/streaming/src/main/scala/spark/streaming/SenGeneratorForPerformanceTest.scala +++ b/streaming/src/main/scala/spark/streaming/utils/SenGeneratorForPerformanceTest.scala diff --git a/streaming/src/main/scala/spark/streaming/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/utils/SenderReceiverTest.scala index 6af270298a..6af270298a 100644 --- a/streaming/src/main/scala/spark/streaming/SenderReceiverTest.scala +++ b/streaming/src/main/scala/spark/streaming/utils/SenderReceiverTest.scala diff --git a/streaming/src/main/scala/spark/streaming/SentenceFileGenerator.scala b/streaming/src/main/scala/spark/streaming/utils/SentenceFileGenerator.scala index 15858f59e3..15858f59e3 100644 --- a/streaming/src/main/scala/spark/streaming/SentenceFileGenerator.scala +++ b/streaming/src/main/scala/spark/streaming/utils/SentenceFileGenerator.scala diff --git a/streaming/src/main/scala/spark/streaming/SentenceGenerator.scala b/streaming/src/main/scala/spark/streaming/utils/SentenceGenerator.scala index a9f124d2d7..a9f124d2d7 100644 --- a/streaming/src/main/scala/spark/streaming/SentenceGenerator.scala +++ b/streaming/src/main/scala/spark/streaming/utils/SentenceGenerator.scala diff --git a/streaming/src/main/scala/spark/streaming/ShuffleTest.scala b/streaming/src/main/scala/spark/streaming/utils/ShuffleTest.scala index 32aa4144a0..32aa4144a0 100644 --- a/streaming/src/main/scala/spark/streaming/ShuffleTest.scala +++ b/streaming/src/main/scala/spark/streaming/utils/ShuffleTest.scala |