diff options
Diffstat (limited to 'external/flume/src')
7 files changed, 422 insertions, 0 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala new file mode 100644 index 0000000000..35e7a01abc --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Flume input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversion. Import org.apache.spark.streaming.flume._ to use these functions. + */ +class FlumeFunctions(ssc: StreamingContext) { + /** + * Create a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream ( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala new file mode 100644 index 0000000000..ce3ef47cfe --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.net.InetSocketAddress +import java.io.{ObjectInput, ObjectOutput, Externalizable} +import java.nio.ByteBuffer + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.avro.ipc.NettyServer + +import org.apache.spark.util.Utils +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + +private[streaming] +class FlumeInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel +) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + + override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { + new FlumeReceiver(host, port, storageLevel) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event : AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +private[streaming] object SparkFlumeEvent { + def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +private[streaming] +class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { + override def append(event : AvroFlumeEvent) : Status = { + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) + Status.OK + } + + override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { + events.foreach (event => + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface.*/ +private[streaming] +class FlumeReceiver( + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent] { + + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + val server = new NettyServer(responder, new InetSocketAddress(host, port)) + blockGenerator.start() + server.start() + logInfo("Flume receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + logInfo("Flume receiver stopped") + } + + override def getLocationPreference = Some(host) +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala new file mode 100644 index 0000000000..4e66ae3535 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.storage.StorageLevel + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Flume input streams. + */ +class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port, storageLevel) + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala new file mode 100644 index 0000000000..c087a39d1c --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object flume { + implicit def sscToFlumeFunctions(ssc: StreamingContext) = new FlumeFunctions(ssc) +} + diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/JavaFlumeStreamSuite.java new file mode 100644 index 0000000000..deffc78c4c --- /dev/null +++ b/external/flume/src/test/java/JavaFlumeStreamSuite.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.flume.SparkFlumeEvent; +import org.junit.Test; + +public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); + + // tests the API, does not actually test data receiving + JavaDStream<SparkFlumeEvent> test1 = sscWithFlume.flumeStream("localhost", 12345); + JavaDStream<SparkFlumeEvent> test2 = sscWithFlume.flumeStream("localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithFlume.socketTextStream("localhost", 9999); + } +} diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/flume/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala new file mode 100644 index 0000000000..ba33320d02 --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.charset.Charset + +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.util.ManualClock + +class FlumeStreamSuite extends TestSuiteBase { + + val testPort = 9999 + + test("flume input stream") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + Thread.sleep(1000) + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) + val client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], transceiver) + + for (i <- 0 until input.size) { + val event = new AvroFlumeEvent + event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) + event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) + client.append(event) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + + val startTime = System.currentTimeMillis() + while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) + Thread.sleep(100) + } + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + val decoder = Charset.forName("UTF-8").newDecoder() + + assert(outputBuffer.size === input.length) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + val str = decoder.decode(outputBuffer(i).head.event.getBody) + assert(str.toString === input(i).toString) + assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") + } + } +} |