aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/flume.py140
-rw-r--r--python/pyspark/streaming/mqtt.py70
-rw-r--r--python/pyspark/streaming/tests.py266
3 files changed, 3 insertions, 473 deletions
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
deleted file mode 100644
index cd30483fc6..0000000000
--- a/python/pyspark/streaming/flume.py
+++ /dev/null
@@ -1,140 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-if sys.version >= "3":
- from io import BytesIO
-else:
- from StringIO import StringIO
-from py4j.protocol import Py4JJavaError
-
-from pyspark.storagelevel import StorageLevel
-from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
-from pyspark.streaming import DStream
-
-__all__ = ['FlumeUtils', 'utf8_decoder']
-
-
-def utf8_decoder(s):
- """ Decode the unicode as UTF-8 """
- if s is None:
- return None
- return s.decode('utf-8')
-
-
-class FlumeUtils(object):
-
- @staticmethod
- def createStream(ssc, hostname, port,
- storageLevel=StorageLevel.MEMORY_AND_DISK_2,
- enableDecompression=False,
- bodyDecoder=utf8_decoder):
- """
- Create an input stream that pulls events from Flume.
-
- :param ssc: StreamingContext object
- :param hostname: Hostname of the slave machine to which the flume data will be sent
- :param port: Port of the slave machine to which the flume data will be sent
- :param storageLevel: Storage level to use for storing the received objects
- :param enableDecompression: Should netty server decompress input stream
- :param bodyDecoder: A function used to decode body (default is utf8_decoder)
- :return: A DStream object
- """
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- helper = FlumeUtils._get_helper(ssc._sc)
- jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
- return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
-
- @staticmethod
- def createPollingStream(ssc, addresses,
- storageLevel=StorageLevel.MEMORY_AND_DISK_2,
- maxBatchSize=1000,
- parallelism=5,
- bodyDecoder=utf8_decoder):
- """
- Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- This stream will poll the sink for data and will pull events as they are available.
-
- :param ssc: StreamingContext object
- :param addresses: List of (host, port)s on which the Spark Sink is running.
- :param storageLevel: Storage level to use for storing the received objects
- :param maxBatchSize: The maximum number of events to be pulled from the Spark sink
- in a single RPC call
- :param parallelism: Number of concurrent requests this stream should send to the sink.
- Note that having a higher number of requests concurrently being pulled
- will result in this stream using more threads
- :param bodyDecoder: A function used to decode body (default is utf8_decoder)
- :return: A DStream object
- """
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- hosts = []
- ports = []
- for (host, port) in addresses:
- hosts.append(host)
- ports.append(port)
- helper = FlumeUtils._get_helper(ssc._sc)
- jstream = helper.createPollingStream(
- ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
- return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
-
- @staticmethod
- def _toPythonDStream(ssc, jstream, bodyDecoder):
- ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
- stream = DStream(jstream, ssc, ser)
-
- def func(event):
- headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
- headers = {}
- strSer = UTF8Deserializer()
- for i in range(0, read_int(headersBytes)):
- key = strSer.loads(headersBytes)
- value = strSer.loads(headersBytes)
- headers[key] = value
- body = bodyDecoder(event[1])
- return (headers, body)
- return stream.map(func)
-
- @staticmethod
- def _get_helper(sc):
- try:
- return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
- except TypeError as e:
- if str(e) == "'JavaPackage' object is not callable":
- FlumeUtils._printErrorMsg(sc)
- raise
-
- @staticmethod
- def _printErrorMsg(sc):
- print("""
-________________________________________________________________________________________________
-
- Spark Streaming's Flume libraries not found in class path. Try one of the following.
-
- 1. Include the Flume library and its dependencies with in the
- spark-submit command as
-
- $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
-
- 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
- Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s.
- Then, include the jar in the spark-submit command as
-
- $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
-
-________________________________________________________________________________________________
-
-""" % (sc.version, sc.version))
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
deleted file mode 100644
index 8848a70c75..0000000000
--- a/python/pyspark/streaming/mqtt.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from py4j.protocol import Py4JJavaError
-
-from pyspark.storagelevel import StorageLevel
-from pyspark.serializers import UTF8Deserializer
-from pyspark.streaming import DStream
-
-__all__ = ['MQTTUtils']
-
-
-class MQTTUtils(object):
-
- @staticmethod
- def createStream(ssc, brokerUrl, topic,
- storageLevel=StorageLevel.MEMORY_AND_DISK_2):
- """
- Create an input stream that pulls messages from a Mqtt Broker.
-
- :param ssc: StreamingContext object
- :param brokerUrl: Url of remote mqtt publisher
- :param topic: topic name to subscribe to
- :param storageLevel: RDD storage level.
- :return: A DStream object
- """
- try:
- helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
- except TypeError as e:
- if str(e) == "'JavaPackage' object is not callable":
- MQTTUtils._printErrorMsg(ssc.sparkContext)
- raise
-
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
- return DStream(jstream, ssc, UTF8Deserializer())
-
- @staticmethod
- def _printErrorMsg(sc):
- print("""
-________________________________________________________________________________________________
-
- Spark Streaming's MQTT libraries not found in class path. Try one of the following.
-
- 1. Include the MQTT library and its dependencies with in the
- spark-submit command as
-
- $ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ...
-
- 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
- Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, Version = %s.
- Then, include the jar in the spark-submit command as
-
- $ bin/spark-submit --jars <spark-streaming-mqtt-assembly.jar> ...
-________________________________________________________________________________________________
-""" % (sc.version, sc.version))
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index f4bbb1b128..eb4696c55d 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -45,8 +45,6 @@ from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
-from pyspark.streaming.flume import FlumeUtils
-from pyspark.streaming.mqtt import MQTTUtils
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.streaming.listener import StreamingListener
@@ -1262,207 +1260,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream)
-class FlumeStreamTests(PySparkStreamingTestCase):
- timeout = 20 # seconds
- duration = 1
-
- def setUp(self):
- super(FlumeStreamTests, self).setUp()
- self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()
-
- def tearDown(self):
- if self._utils is not None:
- self._utils.close()
- self._utils = None
-
- super(FlumeStreamTests, self).tearDown()
-
- def _startContext(self, n, compressed):
- # Start the StreamingContext and also collect the result
- dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(),
- enableDecompression=compressed)
- result = []
-
- def get_output(_, rdd):
- for event in rdd.collect():
- if len(result) < n:
- result.append(event)
- dstream.foreachRDD(get_output)
- self.ssc.start()
- return result
-
- def _validateResult(self, input, result):
- # Validate both the header and the body
- header = {"test": "header"}
- self.assertEqual(len(input), len(result))
- for i in range(0, len(input)):
- self.assertEqual(header, result[i][0])
- self.assertEqual(input[i], result[i][1])
-
- def _writeInput(self, input, compressed):
- # Try to write input to the receiver until success or timeout
- start_time = time.time()
- while True:
- try:
- self._utils.writeInput(input, compressed)
- break
- except:
- if time.time() - start_time < self.timeout:
- time.sleep(0.01)
- else:
- raise
-
- def test_flume_stream(self):
- input = [str(i) for i in range(1, 101)]
- result = self._startContext(len(input), False)
- self._writeInput(input, False)
- self.wait_for(result, len(input))
- self._validateResult(input, result)
-
- def test_compressed_flume_stream(self):
- input = [str(i) for i in range(1, 101)]
- result = self._startContext(len(input), True)
- self._writeInput(input, True)
- self.wait_for(result, len(input))
- self._validateResult(input, result)
-
-
-class FlumePollingStreamTests(PySparkStreamingTestCase):
- timeout = 20 # seconds
- duration = 1
- maxAttempts = 5
-
- def setUp(self):
- self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()
-
- def tearDown(self):
- if self._utils is not None:
- self._utils.close()
- self._utils = None
-
- def _writeAndVerify(self, ports):
- # Set up the streaming context and input streams
- ssc = StreamingContext(self.sc, self.duration)
- try:
- addresses = [("localhost", port) for port in ports]
- dstream = FlumeUtils.createPollingStream(
- ssc,
- addresses,
- maxBatchSize=self._utils.eventsPerBatch(),
- parallelism=5)
- outputBuffer = []
-
- def get_output(_, rdd):
- for e in rdd.collect():
- outputBuffer.append(e)
-
- dstream.foreachRDD(get_output)
- ssc.start()
- self._utils.sendDatAndEnsureAllDataHasBeenReceived()
-
- self.wait_for(outputBuffer, self._utils.getTotalEvents())
- outputHeaders = [event[0] for event in outputBuffer]
- outputBodies = [event[1] for event in outputBuffer]
- self._utils.assertOutput(outputHeaders, outputBodies)
- finally:
- ssc.stop(False)
-
- def _testMultipleTimes(self, f):
- attempt = 0
- while True:
- try:
- f()
- break
- except:
- attempt += 1
- if attempt >= self.maxAttempts:
- raise
- else:
- import traceback
- traceback.print_exc()
-
- def _testFlumePolling(self):
- try:
- port = self._utils.startSingleSink()
- self._writeAndVerify([port])
- self._utils.assertChannelsAreEmpty()
- finally:
- self._utils.close()
-
- def _testFlumePollingMultipleHosts(self):
- try:
- port = self._utils.startSingleSink()
- self._writeAndVerify([port])
- self._utils.assertChannelsAreEmpty()
- finally:
- self._utils.close()
-
- def test_flume_polling(self):
- self._testMultipleTimes(self._testFlumePolling)
-
- def test_flume_polling_multiple_hosts(self):
- self._testMultipleTimes(self._testFlumePollingMultipleHosts)
-
-
-class MQTTStreamTests(PySparkStreamingTestCase):
- timeout = 20 # seconds
- duration = 1
-
- def setUp(self):
- super(MQTTStreamTests, self).setUp()
- self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils()
- self._MQTTTestUtils.setup()
-
- def tearDown(self):
- if self._MQTTTestUtils is not None:
- self._MQTTTestUtils.teardown()
- self._MQTTTestUtils = None
-
- super(MQTTStreamTests, self).tearDown()
-
- def _randomTopic(self):
- return "topic-%d" % random.randint(0, 10000)
-
- def _startContext(self, topic):
- # Start the StreamingContext and also collect the result
- stream = MQTTUtils.createStream(self.ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic)
- result = []
-
- def getOutput(_, rdd):
- for data in rdd.collect():
- result.append(data)
-
- stream.foreachRDD(getOutput)
- self.ssc.start()
- return result
-
- def test_mqtt_stream(self):
- """Test the Python MQTT stream API."""
- sendData = "MQTT demo for spark streaming"
- topic = self._randomTopic()
- result = self._startContext(topic)
-
- def retry():
- self._MQTTTestUtils.publishData(topic, sendData)
- # Because "publishData" sends duplicate messages, here we should use > 0
- self.assertTrue(len(result) > 0)
- self.assertEqual(sendData, result[0])
-
- # Retry it because we don't know when the receiver will start.
- self._retry_or_timeout(retry)
-
- def _retry_or_timeout(self, test_func):
- start_time = time.time()
- while True:
- try:
- test_func()
- break
- except:
- if time.time() - start_time > self.timeout:
- raise
- time.sleep(0.01)
-
-
class KinesisStreamTests(PySparkStreamingTestCase):
def test_kinesis_stream_api(self):
@@ -1551,57 +1348,6 @@ def search_kafka_assembly_jar():
return jars[0]
-def search_flume_assembly_jar():
- SPARK_HOME = os.environ["SPARK_HOME"]
- flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
- jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
- if not jars:
- raise Exception(
- ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
- "You need to build Spark with "
- "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
- "'build/mvn package' before running this test.")
- elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
- "remove all but one") % (", ".join(jars)))
- else:
- return jars[0]
-
-
-def search_mqtt_assembly_jar():
- SPARK_HOME = os.environ["SPARK_HOME"]
- mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly")
- jars = search_jar(mqtt_assembly_dir, "spark-streaming-mqtt-assembly")
- if not jars:
- raise Exception(
- ("Failed to find Spark Streaming MQTT assembly jar in %s. " % mqtt_assembly_dir) +
- "You need to build Spark with "
- "'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or "
- "'build/mvn package' before running this test")
- elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT assembly JARs: %s; please "
- "remove all but one") % (", ".join(jars)))
- else:
- return jars[0]
-
-
-def search_mqtt_test_jar():
- SPARK_HOME = os.environ["SPARK_HOME"]
- mqtt_test_dir = os.path.join(SPARK_HOME, "external/mqtt")
- jars = glob.glob(
- os.path.join(mqtt_test_dir, "target/scala-*/spark-streaming-mqtt-test-*.jar"))
- if not jars:
- raise Exception(
- ("Failed to find Spark Streaming MQTT test jar in %s. " % mqtt_test_dir) +
- "You need to build Spark with "
- "'build/sbt assembly/assembly streaming-mqtt/test:assembly'")
- elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT test JARs: %s; please "
- "remove all but one") % (", ".join(jars)))
- else:
- return jars[0]
-
-
def search_kinesis_asl_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "external/kinesis-asl-assembly")
@@ -1622,24 +1368,18 @@ are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
if __name__ == "__main__":
from pyspark.streaming.tests import *
kafka_assembly_jar = search_kafka_assembly_jar()
- flume_assembly_jar = search_flume_assembly_jar()
- mqtt_assembly_jar = search_mqtt_assembly_jar()
- mqtt_test_jar = search_mqtt_test_jar()
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
if kinesis_asl_assembly_jar is None:
kinesis_jar_present = False
- jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
- mqtt_test_jar)
+ jars = kafka_assembly_jar
else:
kinesis_jar_present = True
- jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
- mqtt_test_jar, kinesis_asl_assembly_jar)
+ jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
- KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests,
- StreamingListenerTests]
+ KafkaStreamTests, StreamingListenerTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)