aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorNick Pentreath <nick.pentreath@gmail.com>2013-02-21 09:33:08 +0200
committerNick Pentreath <nick.pentreath@gmail.com>2013-02-21 09:33:08 +0200
commit16d456742e596cdf5ac870d44ea2b4f308ddebdc (patch)
treea0989a98143f53ecaae0e9ffa4d01dc306f1f7fa /examples/src
parent8a281399f970db761ea05baf07972fff1c5bd058 (diff)
parent2921fa7d81be201e5d694ab58ade6233f397eef9 (diff)
downloadspark-16d456742e596cdf5ac870d44ea2b4f308ddebdc.tar.gz
spark-16d456742e596cdf5ac870d44ea2b4f308ddebdc.tar.bz2
spark-16d456742e596cdf5ac870d44ea2b4f308ddebdc.zip
Merge remote-tracking branch 'upstream/streaming' into streaming-eg-algebird
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/spark/examples/LogQuery.scala66
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala2
2 files changed, 67 insertions, 1 deletions
diff --git a/examples/src/main/scala/spark/examples/LogQuery.scala b/examples/src/main/scala/spark/examples/LogQuery.scala
new file mode 100644
index 0000000000..5330b8da94
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LogQuery.scala
@@ -0,0 +1,66 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+/**
+ * Executes a roll up-style query against Apache logs.
+ */
+object LogQuery {
+ val exampleApacheLogs = List(
+ """10.10.10.10 - "FRED" [18/Jan/2013:17:56:07 +1100] "GET http://images.com/2013/Generic.jpg
+ | HTTP/1.1" 304 315 "http://referall.com/" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
+ | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
+ | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
+ | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 ""
+ | 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.replace("\n", ""),
+ """10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg
+ | HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
+ | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
+ | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
+ | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 ""
+ | 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.replace("\n", "")
+ )
+
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: LogQuery <master> [logFile]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "Log Query")
+
+ val dataSet =
+ if (args.length == 2) sc.textFile(args(1))
+ else sc.parallelize(exampleApacheLogs)
+
+ val apacheLogRegex =
+ """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
+
+ /** Tracks the total query count and number of aggregate bytes for a particular group. */
+ class Stats(val count: Int, val numBytes: Int) extends Serializable {
+ def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
+ override def toString = "bytes=%s\tn=%s".format(numBytes, count)
+ }
+
+ def extractKey(line: String): (String, String, String) = {
+ apacheLogRegex.findFirstIn(line) match {
+ case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
+ if (user != "\"-\"") (ip, user, query)
+ else (null, null, null)
+ case _ => (null, null, null)
+ }
+ }
+
+ def extractStats(line: String): Stats = {
+ apacheLogRegex.findFirstIn(line) match {
+ case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
+ new Stats(1, bytes.toInt)
+ case _ => new Stats(1, 0)
+ }
+ }
+
+ dataSet.map(line => (extractKey(line), extractStats(line)))
+ .reduceByKey((a, b) => a.merge(b))
+ .collect().foreach{
+ case (user, query) => println("%s\t%s".format(user, query))}
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
index 71b4e5bf1a..346151c147 100644
--- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -147,7 +147,7 @@ object ActorWordCount {
*/
val lines = ssc.actorStream[String](
- Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+ Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
//compute wordcount