aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-08 21:19:08 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-08 21:19:08 -0800
commit91227566bc9d8aabaec3f2a37a09a17afa20989c (patch)
tree80d19aac29217005b3f1cb08ca95fa08bbb9d946 /external/flume
parent7210257ba3038d5e22d4b60fe9c3113dc45c3dff (diff)
parent04d83fc37f9eef89c20331c85291a0a169f75e6d (diff)
downloadspark-91227566bc9d8aabaec3f2a37a09a17afa20989c.tar.gz
spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.tar.bz2
spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.zip
Merge remote-tracking branch 'spark-upstream/master' into HEAD
Conflicts: README.md core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala pom.xml project/SparkBuild.scala repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/pom.xml93
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala155
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala70
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java34
-rw-r--r--external/flume/src/test/resources/log4j.properties29
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala86
6 files changed, 467 insertions, 0 deletions
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
new file mode 100644
index 0000000000..443910a03a
--- /dev/null
+++ b/external/flume/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>1.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
new file mode 100644
index 0000000000..ce3ef47cfe
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.InetSocketAddress
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import org.apache.flume.source.avro.AvroSourceProtocol
+import org.apache.flume.source.avro.AvroFlumeEvent
+import org.apache.flume.source.avro.Status
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.avro.ipc.NettyServer
+
+import org.apache.spark.util.Utils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+
+private[streaming]
+class FlumeInputDStream[T: ClassTag](
+ @transient ssc_ : StreamingContext,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+
+ override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ new FlumeReceiver(host, port, storageLevel)
+ }
+}
+
+/**
+ * A wrapper class for AvroFlumeEvent's with a custom serialization format.
+ *
+ * This is necessary because AvroFlumeEvent uses inner data structures
+ * which are not serializable.
+ */
+class SparkFlumeEvent() extends Externalizable {
+ var event : AvroFlumeEvent = new AvroFlumeEvent()
+
+ /* De-serialize from bytes. */
+ def readExternal(in: ObjectInput) {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.read(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.read(keyBuff)
+ val key : String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.read(valBuff)
+ val value : String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+
+ event.setBody(ByteBuffer.wrap(bodyBuff))
+ event.setHeaders(headers)
+ }
+
+ /* Serialize to bytes. */
+ def writeExternal(out: ObjectOutput) {
+ val body = event.getBody.array()
+ out.writeInt(body.length)
+ out.write(body)
+
+ val numHeaders = event.getHeaders.size()
+ out.writeInt(numHeaders)
+ for ((k, v) <- event.getHeaders) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
+
+private[streaming] object SparkFlumeEvent {
+ def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
+ val event = new SparkFlumeEvent
+ event.event = in
+ event
+ }
+}
+
+/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
+class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event : AvroFlumeEvent) : Status = {
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ Status.OK
+ }
+
+ override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
+ events.foreach (event =>
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ Status.OK
+ }
+}
+
+/** A NetworkReceiver which listens for events using the
+ * Flume Avro interface.*/
+private[streaming]
+class FlumeReceiver(
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[SparkFlumeEvent] {
+
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ val server = new NettyServer(responder, new InetSocketAddress(host, port))
+ blockGenerator.start()
+ server.start()
+ logInfo("Flume receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ logInfo("Flume receiver stopped")
+ }
+
+ override def getLocationPreference = Some(host)
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
new file mode 100644
index 0000000000..834b775d4f
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+
+object FlumeUtils {
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel)
+ }
+}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
new file mode 100644
index 0000000000..733389b98d
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+import org.junit.Test;
+
+public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ // tests the API, does not actually test data receiving
+ JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+ JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/flume/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
new file mode 100644
index 0000000000..2e8e9fac45
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.util.ManualClock
+
+class FlumeStreamSuite extends TestSuiteBase {
+
+ val testPort = 9999
+
+ test("flume input stream") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq(1, 2, 3, 4, 5)
+ Thread.sleep(1000)
+ val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
+ val client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol], transceiver)
+
+ for (i <- 0 until input.size) {
+ val event = new AvroFlumeEvent
+ event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
+ event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+ client.append(event)
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+
+ val startTime = System.currentTimeMillis()
+ while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
+ Thread.sleep(100)
+ }
+ Thread.sleep(1000)
+ val timeTaken = System.currentTimeMillis() - startTime
+ assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
+ logInfo("Stopping context")
+ ssc.stop()
+
+ val decoder = Charset.forName("UTF-8").newDecoder()
+
+ assert(outputBuffer.size === input.length)
+ for (i <- 0 until outputBuffer.size) {
+ assert(outputBuffer(i).size === 1)
+ val str = decoder.decode(outputBuffer(i).head.event.getBody)
+ assert(str.toString === input(i).toString)
+ assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
+ }
+ }
+}