aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-01 11:59:24 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-01 11:59:24 -0700
commit75b9fe4c5ff6f206c6fc9100563d625b39f142ba (patch)
tree8b7e9a8de0003a8525845f84ef548e76fc9d0729 /python/pyspark/streaming
parentb8faa32875aa560cdce340266d898902a920418d (diff)
downloadspark-75b9fe4c5ff6f206c6fc9100563d625b39f142ba.tar.gz
spark-75b9fe4c5ff6f206c6fc9100563d625b39f142ba.tar.bz2
spark-75b9fe4c5ff6f206c6fc9100563d625b39f142ba.zip
[SPARK-8378] [STREAMING] Add the Python API for Flume
Author: zsxwing <zsxwing@gmail.com> Closes #6830 from zsxwing/flume-python and squashes the following commits: 78dfdac [zsxwing] Fix the compile error in the test code f1bf3c0 [zsxwing] Address TD's comments 0449723 [zsxwing] Add sbt goal streaming-flume-assembly/assembly e93736b [zsxwing] Fix the test case for determine_modules_to_test 9d5821e [zsxwing] Fix pyspark_core dependencies f9ee681 [zsxwing] Merge branch 'master' into flume-python 7a55837 [zsxwing] Add streaming_flume_assembly to run-tests.py b96b0de [zsxwing] Merge branch 'master' into flume-python ce85e83 [zsxwing] Fix incompatible issues for Python 3 01cbb3d [zsxwing] Add import sys 152364c [zsxwing] Fix the issue that StringIO doesn't work in Python 3 14ba0ff [zsxwing] Add flume-assembly for sbt building b8d5551 [zsxwing] Merge branch 'master' into flume-python 4762c34 [zsxwing] Fix the doc 0336579 [zsxwing] Refactor Flume unit tests and also add tests for Python API 9f33873 [zsxwing] Add the Python API for Flume
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r--python/pyspark/streaming/flume.py147
-rw-r--r--python/pyspark/streaming/tests.py179
2 files changed, 324 insertions, 2 deletions
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
new file mode 100644
index 0000000000..cbb573f226
--- /dev/null
+++ b/python/pyspark/streaming/flume.py
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+if sys.version >= "3":
+ from io import BytesIO
+else:
+ from StringIO import StringIO
+from py4j.java_gateway import Py4JJavaError
+
+from pyspark.storagelevel import StorageLevel
+from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
+from pyspark.streaming import DStream
+
+__all__ = ['FlumeUtils', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+ """ Decode the unicode as UTF-8 """
+ return s and s.decode('utf-8')
+
+
+class FlumeUtils(object):
+
+ @staticmethod
+ def createStream(ssc, hostname, port,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ enableDecompression=False,
+ bodyDecoder=utf8_decoder):
+ """
+ Create an input stream that pulls events from Flume.
+
+ :param ssc: StreamingContext object
+ :param hostname: Hostname of the slave machine to which the flume data will be sent
+ :param port: Port of the slave machine to which the flume data will be sent
+ :param storageLevel: Storage level to use for storing the received objects
+ :param enableDecompression: Should netty server decompress input stream
+ :param bodyDecoder: A function used to decode body (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+
+ try:
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ FlumeUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+
+ return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+ @staticmethod
+ def createPollingStream(ssc, addresses,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ maxBatchSize=1000,
+ parallelism=5,
+ bodyDecoder=utf8_decoder):
+ """
+ Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ This stream will poll the sink for data and will pull events as they are available.
+
+ :param ssc: StreamingContext object
+ :param addresses: List of (host, port)s on which the Spark Sink is running.
+ :param storageLevel: Storage level to use for storing the received objects
+ :param maxBatchSize: The maximum number of events to be pulled from the Spark sink
+ in a single RPC call
+ :param parallelism: Number of concurrent requests this stream should send to the sink.
+ Note that having a higher number of requests concurrently being pulled
+ will result in this stream using more threads
+ :param bodyDecoder: A function used to decode body (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ hosts = []
+ ports = []
+ for (host, port) in addresses:
+ hosts.append(host)
+ ports.append(port)
+
+ try:
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createPollingStream(
+ ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ FlumeUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+
+ return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+ @staticmethod
+ def _toPythonDStream(ssc, jstream, bodyDecoder):
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+
+ def func(event):
+ headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
+ headers = {}
+ strSer = UTF8Deserializer()
+ for i in range(0, read_int(headersBytes)):
+ key = strSer.loads(headersBytes)
+ value = strSer.loads(headersBytes)
+ headers[key] = value
+ body = bodyDecoder(event[1])
+ return (headers, body)
+ return stream.map(func)
+
+ @staticmethod
+ def _printErrorMsg(sc):
+ print("""
+________________________________________________________________________________________________
+
+ Spark Streaming's Flume libraries not found in class path. Try one of the following.
+
+ 1. Include the Flume library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s.
+ Then, include the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 91ce681fbe..188c8ff120 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -38,6 +38,7 @@ else:
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
+from pyspark.streaming.flume import FlumeUtils
class PySparkStreamingTestCase(unittest.TestCase):
@@ -677,7 +678,156 @@ class KafkaStreamTests(PySparkStreamingTestCase):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)
-if __name__ == "__main__":
+
+class FlumeStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
+
+ def setUp(self):
+ super(FlumeStreamTests, self).setUp()
+
+ utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.flume.FlumeTestUtils")
+ self._utils = utilsClz.newInstance()
+
+ def tearDown(self):
+ if self._utils is not None:
+ self._utils.close()
+ self._utils = None
+
+ super(FlumeStreamTests, self).tearDown()
+
+ def _startContext(self, n, compressed):
+ # Start the StreamingContext and also collect the result
+ dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(),
+ enableDecompression=compressed)
+ result = []
+
+ def get_output(_, rdd):
+ for event in rdd.collect():
+ if len(result) < n:
+ result.append(event)
+ dstream.foreachRDD(get_output)
+ self.ssc.start()
+ return result
+
+ def _validateResult(self, input, result):
+ # Validate both the header and the body
+ header = {"test": "header"}
+ self.assertEqual(len(input), len(result))
+ for i in range(0, len(input)):
+ self.assertEqual(header, result[i][0])
+ self.assertEqual(input[i], result[i][1])
+
+ def _writeInput(self, input, compressed):
+ # Try to write input to the receiver until success or timeout
+ start_time = time.time()
+ while True:
+ try:
+ self._utils.writeInput(input, compressed)
+ break
+ except:
+ if time.time() - start_time < self.timeout:
+ time.sleep(0.01)
+ else:
+ raise
+
+ def test_flume_stream(self):
+ input = [str(i) for i in range(1, 101)]
+ result = self._startContext(len(input), False)
+ self._writeInput(input, False)
+ self.wait_for(result, len(input))
+ self._validateResult(input, result)
+
+ def test_compressed_flume_stream(self):
+ input = [str(i) for i in range(1, 101)]
+ result = self._startContext(len(input), True)
+ self._writeInput(input, True)
+ self.wait_for(result, len(input))
+ self._validateResult(input, result)
+
+
+class FlumePollingStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
+ maxAttempts = 5
+
+ def setUp(self):
+ utilsClz = \
+ self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils")
+ self._utils = utilsClz.newInstance()
+
+ def tearDown(self):
+ if self._utils is not None:
+ self._utils.close()
+ self._utils = None
+
+ def _writeAndVerify(self, ports):
+ # Set up the streaming context and input streams
+ ssc = StreamingContext(self.sc, self.duration)
+ try:
+ addresses = [("localhost", port) for port in ports]
+ dstream = FlumeUtils.createPollingStream(
+ ssc,
+ addresses,
+ maxBatchSize=self._utils.eventsPerBatch(),
+ parallelism=5)
+ outputBuffer = []
+
+ def get_output(_, rdd):
+ for e in rdd.collect():
+ outputBuffer.append(e)
+
+ dstream.foreachRDD(get_output)
+ ssc.start()
+ self._utils.sendDatAndEnsureAllDataHasBeenReceived()
+
+ self.wait_for(outputBuffer, self._utils.getTotalEvents())
+ outputHeaders = [event[0] for event in outputBuffer]
+ outputBodies = [event[1] for event in outputBuffer]
+ self._utils.assertOutput(outputHeaders, outputBodies)
+ finally:
+ ssc.stop(False)
+
+ def _testMultipleTimes(self, f):
+ attempt = 0
+ while True:
+ try:
+ f()
+ break
+ except:
+ attempt += 1
+ if attempt >= self.maxAttempts:
+ raise
+ else:
+ import traceback
+ traceback.print_exc()
+
+ def _testFlumePolling(self):
+ try:
+ port = self._utils.startSingleSink()
+ self._writeAndVerify([port])
+ self._utils.assertChannelsAreEmpty()
+ finally:
+ self._utils.close()
+
+ def _testFlumePollingMultipleHosts(self):
+ try:
+ port = self._utils.startSingleSink()
+ self._writeAndVerify([port])
+ self._utils.assertChannelsAreEmpty()
+ finally:
+ self._utils.close()
+
+ def test_flume_polling(self):
+ self._testMultipleTimes(self._testFlumePolling)
+
+ def test_flume_polling_multiple_hosts(self):
+ self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+
+
+def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
jars = glob.glob(
@@ -692,5 +842,30 @@ if __name__ == "__main__":
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
"remove all but one") % kafka_assembly_dir)
else:
- os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars[0]
+ return jars[0]
+
+
+def search_flume_assembly_jar():
+ SPARK_HOME = os.environ["SPARK_HOME"]
+ flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
+ jars = glob.glob(
+ os.path.join(flume_assembly_dir, "target/scala-*/spark-streaming-flume-assembly-*.jar"))
+ if not jars:
+ raise Exception(
+ ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
+ "You need to build Spark with "
+ "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
+ "'build/mvn package' before running this test")
+ elif len(jars) > 1:
+ raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
+ "remove all but one") % flume_assembly_dir)
+ else:
+ return jars[0]
+
+if __name__ == "__main__":
+ kafka_assembly_jar = search_kafka_assembly_jar()
+ flume_assembly_jar = search_flume_assembly_jar()
+ jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
+
+ os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
unittest.main()