aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-02-04 14:15:50 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-02-04 14:18:11 -0800
commitcc37601ecb72abd1351ed73b3be1fb517a31a4e1 (patch)
treee73deb7919d27404b99fd9417cd377c630f36ffc /examples
parentf6ec547ea7b56ee607a4c2a69206f8952318eaf1 (diff)
downloadspark-cc37601ecb72abd1351ed73b3be1fb517a31a4e1.tar.gz
spark-cc37601ecb72abd1351ed73b3be1fb517a31a4e1.tar.bz2
spark-cc37601ecb72abd1351ed73b3be1fb517a31a4e1.zip
Adding an example with an OLAP roll-up
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/OLAPQuery.scala66
1 files changed, 66 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/examples/OLAPQuery.scala b/examples/src/main/scala/spark/examples/OLAPQuery.scala
new file mode 100644
index 0000000000..ff3af01b17
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/OLAPQuery.scala
@@ -0,0 +1,66 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+/**
+ * Executes a roll up-style query against Apache logs.
+ */
+object OLAPQuery {
+ val exampleApacheLogs = List(
+ """10.10.10.10 - "FRED" [18/Jan/2013:17:56:07 +1100] "GET http://images.com/2013/Generic.jpg
+ | HTTP/1.1" 304 315 "http://referall.com/" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
+ | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
+ | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
+ | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 ""
+ | 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.replace("\n", ""),
+ """10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg
+ | HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
+ | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
+ | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
+ | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 ""
+ | 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.replace("\n", "")
+ )
+
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: OLAPQuery <master> [logFile]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "OLAP Query")
+
+ val dataSet =
+ if (args.length == 2) sc.textFile(args(1))
+ else sc.parallelize(exampleApacheLogs)
+
+ val apache_log_regex =
+ """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
+
+ /** Tracks the total query count and number of aggregate bytes for a particular group. */
+ class Stats(val count: Int, val numBytes: Int) extends Serializable {
+ def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
+ override def toString = "bytes=%s\tn=%s".format(numBytes, count)
+ }
+
+ def extractKey(line: String): (String, String, String) = {
+ apache_log_regex findFirstIn line match {
+ case Some(apache_log_regex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
+ if (user != "\"-\"") (ip, user, query)
+ else (null, null, null)
+ case _ => (null, null, null)
+ }
+ }
+
+ def extractStats(line: String): Stats = {
+ apache_log_regex findFirstIn line match {
+ case Some(apache_log_regex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
+ new Stats(1, bytes.toInt)
+ case _ => new Stats(1, 0)
+ }
+ }
+
+ dataSet.map(line => (extractKey(line), extractStats(line)))
+ .reduceByKey((a, b) => a.merge(b))
+ .collect().foreach{
+ case (user, query) => println("%s\t%s".format(user, query))}
+ }
+}