diff options
4 files changed, 76 insertions, 2 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index f43af670c6..28da3dbde4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -24,6 +24,11 @@
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>algebird-core_2.9.2</artifactId>
+ <version>0.1.8</version>
+ </dependency>
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
new file mode 100644
index 0000000000..023a0add80
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
@@ -0,0 +1,63 @@
+package spark.streaming.examples.twitter
+import spark.streaming.{Seconds, StreamingContext}
+import spark.storage.StorageLevel
+import com.twitter.algebird.HyperLogLog._
+import com.twitter.algebird.HyperLogLogMonoid
+import spark.streaming.dstream.TwitterInputDStream
+ * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
+ * TwitterInputDStream
+ */
+object StreamingHLL {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+ val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
+ val stream = new TwitterInputDStream(ssc, username, password, filters,
+ StorageLevel.MEMORY_ONLY_SER)
+ ssc.registerInputStream(stream)
+ val users = stream.map(status => status.getUser.getId)
+ val globalHll = new HyperLogLogMonoid(12)
+ var userSet: Set[Long] = Set()
+ val approxUsers = users.mapPartitions(ids => {
+ val hll = new HyperLogLogMonoid(12)
+ ids.map(id => hll(id))
+ }).reduce(_ + _)
+ val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+ var h = globalHll.zero
+ approxUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ h += partial
+ println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
+ println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
+ }
+ })
+ exactUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ userSet ++= partial
+ println("Exact distinct users this batch: %d".format(partial.size))
+ println("Exact distinct users overall: %d".format(userSet.size))
+ println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
+ }
+ })
+ ssc.start()
+ }
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c6d3cc8b15..090fd65bcb 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -154,7 +154,8 @@ object SparkBuild extends Build {
def examplesSettings = sharedSettings ++ Seq(
- name := "spark-examples"
+ name := "spark-examples",
+ libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.8")
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
diff --git a/pyspark b/pyspark
index ab7f4f50c0..d662e90287 100755
--- a/pyspark
+++ b/pyspark
@@ -36,4 +36,9 @@ if [[ "$SPARK_LAUNCH_WITH_SCALA" != "0" ]] ; then
-exec "$PYSPARK_PYTHON" "$@"
+if [[ "$IPYTHON" = "1" ]] ; then
+ export PYSPARK_PYTHON="ipython"
+ exec "$PYSPARK_PYTHON" -i -c "%run $PYTHONSTARTUP"
+ exec "$PYSPARK_PYTHON" "$@"