aboutsummaryrefslogtreecommitdiff
path: root/dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-02-08 23:13:34 -0800
committerReynold Xin <rxin@apache.org>2014-02-08 23:13:34 -0800
commitf892da8716d614467fddcc3a1b2b589979414219 (patch)
tree7a2bc66f43b8f512c679fcf986e74ba79bbf8042 /dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala
parentc2341c92bb206938fd9b18e2a714e5c6de55b06d (diff)
downloadspark-f892da8716d614467fddcc3a1b2b589979414219.tar.gz
spark-f892da8716d614467fddcc3a1b2b589979414219.tar.bz2
spark-f892da8716d614467fddcc3a1b2b589979414219.zip
Merge pull request #565 from pwendell/dev-scripts. Closes #565.
SPARK-1066: Add developer scripts to repository. These are some developer scripts I've been maintaining in a separate public repo. This patch adds them to the Spark repository so they can evolve here and are clearly accessible to all committers. I may do some small additional clean-up in this PR, but wanted to put them here in case others want to review. There are a few types of scripts here: 1. A tool to merge pull requests. 2. A script for packaging releases. 3. A script for auditing release candidates. Author: Patrick Wendell <pwendell@gmail.com> == Merge branch commits == commit 5d5d331d01f6fd59c2eb830f652955119b012173 Author: Patrick Wendell <pwendell@gmail.com> Date: Sat Feb 8 22:11:47 2014 -0800 SPARK-1066: Add developer scripts to repository.
Diffstat (limited to 'dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala')
-rw-r--r--dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala62
1 files changed, 62 insertions, 0 deletions
diff --git a/dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala b/dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala
new file mode 100644
index 0000000000..3d0722d2ac
--- /dev/null
+++ b/dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main.scala
+
+import scala.collection.mutable.{ListBuffer, Queue}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming._
+
+object SparkStreamingExample {
+
+ def main(args: Array[String]) {
+ val conf = new SparkConf(true)
+ .setMaster("local[2]")
+ .setAppName("Streaming test")
+ val ssc = new StreamingContext(conf, Seconds(1))
+ val seen = ListBuffer[RDD[Int]]()
+
+ val rdd1 = ssc.sparkContext.makeRDD(1 to 100, 10)
+ val rdd2 = ssc.sparkContext.makeRDD(1 to 1000, 10)
+ val rdd3 = ssc.sparkContext.makeRDD(1 to 10000, 10)
+
+ val queue = Queue(rdd1, rdd2, rdd3)
+ val stream = ssc.queueStream(queue)
+
+ stream.foreachRDD(rdd => seen += rdd)
+ ssc.start()
+ Thread.sleep(5000)
+
+ def test(f: => Boolean, failureMsg: String) = {
+ if (!f) {
+ println(failureMsg)
+ System.exit(-1)
+ }
+ }
+
+ val rddCounts = seen.map(rdd => rdd.count()).filter(_ > 0)
+ test(rddCounts.length == 3, "Did not collect three RDD's from stream")
+ test(rddCounts.toSet == Set(100, 1000, 10000), "Did not find expected streams")
+
+ println("Test succeeded")
+
+ ssc.stop()
+ }
+}