aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-14 16:56:04 -0700
committerReynold Xin <rxin@databricks.com>2016-03-14 16:56:04 -0700
commit06dec37455c3f800897defee6fad0da623f26050 (patch)
treed49dd098587f8a3c7a019b0aad605327da6fcecd /python
parent8301fadd8d269da11e72870b7a889596e3337839 (diff)
downloadspark-06dec37455c3f800897defee6fad0da623f26050.tar.gz
spark-06dec37455c3f800897defee6fad0da623f26050.tar.bz2
spark-06dec37455c3f800897defee6fad0da623f26050.zip
[SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages
## What changes were proposed in this pull request? Currently there are a few sub-projects, each for integrating with different external sources for Streaming. Now that we have better ability to include external libraries (spark packages) and with Spark 2.0 coming up, we can move the following projects out of Spark to https://github.com/spark-packages - streaming-flume - streaming-akka - streaming-mqtt - streaming-zeromq - streaming-twitter They are just some ancillary packages and considering the overhead of maintenance, running tests and PR failures, it's better to maintain them out of Spark. In addition, these projects can have their different release cycles and we can release them faster. I have already copied these projects to https://github.com/spark-packages ## How was this patch tested? Jenkins tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11672 from zsxwing/remove-external-pkg.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/flume.py140
-rw-r--r--python/pyspark/streaming/mqtt.py70
-rw-r--r--python/pyspark/streaming/tests.py266
3 files changed, 3 insertions, 473 deletions
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
deleted file mode 100644
index cd30483fc6..0000000000
--- a/python/pyspark/streaming/flume.py
+++ /dev/null
@@ -1,140 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-if sys.version >= "3":
- from io import BytesIO
-else:
- from StringIO import StringIO
-from py4j.protocol import Py4JJavaError
-
-from pyspark.storagelevel import StorageLevel
-from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
-from pyspark.streaming import DStream
-
-__all__ = ['FlumeUtils', 'utf8_decoder']
-
-
-def utf8_decoder(s):
- """ Decode the unicode as UTF-8 """
- if s is None:
- return None
- return s.decode('utf-8')
-
-
-class FlumeUtils(object):
-
- @staticmethod
- def createStream(ssc, hostname, port,
- storageLevel=StorageLevel.MEMORY_AND_DISK_2,
- enableDecompression=False,
- bodyDecoder=utf8_decoder):
- """
- Create an input stream that pulls events from Flume.
-
- :param ssc: StreamingContext object
- :param hostname: Hostname of the slave machine to which the flume data will be sent
- :param port: Port of the slave machine to which the flume data will be sent
- :param storageLevel: Storage level to use for storing the received objects
- :param enableDecompression: Should netty server decompress input stream
- :param bodyDecoder: A function used to decode body (default is utf8_decoder)
- :return: A DStream object
- """
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- helper = FlumeUtils._get_helper(ssc._sc)
- jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
- return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
-
- @staticmethod
- def createPollingStream(ssc, addresses,
- storageLevel=StorageLevel.MEMORY_AND_DISK_2,
- maxBatchSize=1000,
- parallelism=5,
- bodyDecoder=utf8_decoder):
- """
- Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- This stream will poll the sink for data and will pull events as they are available.
-
- :param ssc: StreamingContext object
- :param addresses: List of (host, port)s on which the Spark Sink is running.
- :param storageLevel: Storage level to use for storing the received objects
- :param maxBatchSize: The maximum number of events to be pulled from the Spark sink
- in a single RPC call
- :param parallelism: Number of concurrent requests this stream should send to the sink.
- Note that having a higher number of requests concurrently being pulled
- will result in this stream using more threads
- :param bodyDecoder: A function used to decode body (default is utf8_decoder)
- :return: A DStream object
- """
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- hosts = []
- ports = []
- for (host, port) in addresses:
- hosts.append(host)
- ports.append(port)
- helper = FlumeUtils._get_helper(ssc._sc)
- jstream = helper.createPollingStream(
- ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
- return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
-
- @staticmethod
- def _toPythonDStream(ssc, jstream, bodyDecoder):
- ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
- stream = DStream(jstream, ssc, ser)
-
- def func(event):
- headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
- headers = {}
- strSer = UTF8Deserializer()
- for i in range(0, read_int(headersBytes)):
- key = strSer.loads(headersBytes)
- value = strSer.loads(headersBytes)
- headers[key] = value
- body = bodyDecoder(event[1])
- return (headers, body)
- return stream.map(func)
-
- @staticmethod
- def _get_helper(sc):
- try:
- return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
- except TypeError as e:
- if str(e) == "'JavaPackage' object is not callable":
- FlumeUtils._printErrorMsg(sc)
- raise
-
- @staticmethod
- def _printErrorMsg(sc):
- print("""
-________________________________________________________________________________________________
-
- Spark Streaming's Flume libraries not found in class path. Try one of the following.
-
- 1. Include the Flume library and its dependencies with in the
- spark-submit command as
-
- $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
-
- 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
- Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s.
- Then, include the jar in the spark-submit command as
-
- $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
-
-________________________________________________________________________________________________
-
-""" % (sc.version, sc.version))
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
deleted file mode 100644
index 8848a70c75..0000000000
--- a/python/pyspark/streaming/mqtt.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from py4j.protocol import Py4JJavaError
-
-from pyspark.storagelevel import StorageLevel
-from pyspark.serializers import UTF8Deserializer
-from pyspark.streaming import DStream
-
-__all__ = ['MQTTUtils']
-
-
-class MQTTUtils(object):
-
- @staticmethod
- def createStream(ssc, brokerUrl, topic,
- storageLevel=StorageLevel.MEMORY_AND_DISK_2):
- """
- Create an input stream that pulls messages from a Mqtt Broker.
-
- :param ssc: StreamingContext object
- :param brokerUrl: Url of remote mqtt publisher
- :param topic: topic name to subscribe to
- :param storageLevel: RDD storage level.
- :return: A DStream object
- """
- try:
- helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
- except TypeError as e:
- if str(e) == "'JavaPackage' object is not callable":
- MQTTUtils._printErrorMsg(ssc.sparkContext)
- raise
-
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
- return DStream(jstream, ssc, UTF8Deserializer())
-
- @staticmethod
- def _printErrorMsg(sc):
- print("""
-________________________________________________________________________________________________
-
- Spark Streaming's MQTT libraries not found in class path. Try one of the following.
-
- 1. Include the MQTT library and its dependencies with in the
- spark-submit command as
-
- $ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ...
-
- 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
- Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, Version = %s.
- Then, include the jar in the spark-submit command as
-
- $ bin/spark-submit --jars <spark-streaming-mqtt-assembly.jar> ...
-________________________________________________________________________________________________
-""" % (sc.version, sc.version))
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index f4bbb1b128..eb4696c55d 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -45,8 +45,6 @@ from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
-from pyspark.streaming.flume import FlumeUtils
-from pyspark.streaming.mqtt import MQTTUtils
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.streaming.listener import StreamingListener
@@ -1262,207 +1260,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream)
-class FlumeStreamTests(PySparkStreamingTestCase):
- timeout = 20 # seconds
- duration = 1
-
- def setUp(self):
- super(FlumeStreamTests, self).setUp()
- self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()
-
- def tearDown(self):
- if self._utils is not None:
- self._utils.close()
- self._utils = None
-
- super(FlumeStreamTests, self).tearDown()
-
- def _startContext(self, n, compressed):
- # Start the StreamingContext and also collect the result
- dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(),
- enableDecompression=compressed)
- result = []
-
- def get_output(_, rdd):
- for event in rdd.collect():
- if len(result) < n:
- result.append(event)
- dstream.foreachRDD(get_output)
- self.ssc.start()
- return result
-
- def _validateResult(self, input, result):
- # Validate both the header and the body
- header = {"test": "header"}
- self.assertEqual(len(input), len(result))
- for i in range(0, len(input)):
- self.assertEqual(header, result[i][0])
- self.assertEqual(input[i], result[i][1])
-
- def _writeInput(self, input, compressed):
- # Try to write input to the receiver until success or timeout
- start_time = time.time()
- while True:
- try:
- self._utils.writeInput(input, compressed)
- break
- except:
- if time.time() - start_time < self.timeout:
- time.sleep(0.01)
- else:
- raise
-
- def test_flume_stream(self):
- input = [str(i) for i in range(1, 101)]
- result = self._startContext(len(input), False)
- self._writeInput(input, False)
- self.wait_for(result, len(input))
- self._validateResult(input, result)
-
- def test_compressed_flume_stream(self):
- input = [str(i) for i in range(1, 101)]
- result = self._startContext(len(input), True)
- self._writeInput(input, True)
- self.wait_for(result, len(input))
- self._validateResult(input, result)
-
-
-class FlumePollingStreamTests(PySparkStreamingTestCase):
- timeout = 20 # seconds
- duration = 1
- maxAttempts = 5
-
- def setUp(self):
- self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()
-
- def tearDown(self):
- if self._utils is not None:
- self._utils.close()
- self._utils = None
-
- def _writeAndVerify(self, ports):
- # Set up the streaming context and input streams
- ssc = StreamingContext(self.sc, self.duration)
- try:
- addresses = [("localhost", port) for port in ports]
- dstream = FlumeUtils.createPollingStream(
- ssc,
- addresses,
- maxBatchSize=self._utils.eventsPerBatch(),
- parallelism=5)
- outputBuffer = []
-
- def get_output(_, rdd):
- for e in rdd.collect():
- outputBuffer.append(e)
-
- dstream.foreachRDD(get_output)
- ssc.start()
- self._utils.sendDatAndEnsureAllDataHasBeenReceived()
-
- self.wait_for(outputBuffer, self._utils.getTotalEvents())
- outputHeaders = [event[0] for event in outputBuffer]
- outputBodies = [event[1] for event in outputBuffer]
- self._utils.assertOutput(outputHeaders, outputBodies)
- finally:
- ssc.stop(False)
-
- def _testMultipleTimes(self, f):
- attempt = 0
- while True:
- try:
- f()
- break
- except:
- attempt += 1
- if attempt >= self.maxAttempts:
- raise
- else:
- import traceback
- traceback.print_exc()
-
- def _testFlumePolling(self):
- try:
- port = self._utils.startSingleSink()
- self._writeAndVerify([port])
- self._utils.assertChannelsAreEmpty()
- finally:
- self._utils.close()
-
- def _testFlumePollingMultipleHosts(self):
- try:
- port = self._utils.startSingleSink()
- self._writeAndVerify([port])
- self._utils.assertChannelsAreEmpty()
- finally:
- self._utils.close()
-
- def test_flume_polling(self):
- self._testMultipleTimes(self._testFlumePolling)
-
- def test_flume_polling_multiple_hosts(self):
- self._testMultipleTimes(self._testFlumePollingMultipleHosts)
-
-
-class MQTTStreamTests(PySparkStreamingTestCase):
- timeout = 20 # seconds
- duration = 1
-
- def setUp(self):
- super(MQTTStreamTests, self).setUp()
- self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils()
- self._MQTTTestUtils.setup()
-
- def tearDown(self):
- if self._MQTTTestUtils is not None:
- self._MQTTTestUtils.teardown()
- self._MQTTTestUtils = None
-
- super(MQTTStreamTests, self).tearDown()
-
- def _randomTopic(self):
- return "topic-%d" % random.randint(0, 10000)
-
- def _startContext(self, topic):
- # Start the StreamingContext and also collect the result
- stream = MQTTUtils.createStream(self.ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic)
- result = []
-
- def getOutput(_, rdd):
- for data in rdd.collect():
- result.append(data)
-
- stream.foreachRDD(getOutput)
- self.ssc.start()
- return result
-
- def test_mqtt_stream(self):
- """Test the Python MQTT stream API."""
- sendData = "MQTT demo for spark streaming"
- topic = self._randomTopic()
- result = self._startContext(topic)
-
- def retry():
- self._MQTTTestUtils.publishData(topic, sendData)
- # Because "publishData" sends duplicate messages, here we should use > 0
- self.assertTrue(len(result) > 0)
- self.assertEqual(sendData, result[0])
-
- # Retry it because we don't know when the receiver will start.
- self._retry_or_timeout(retry)
-
- def _retry_or_timeout(self, test_func):
- start_time = time.time()
- while True:
- try:
- test_func()
- break
- except:
- if time.time() - start_time > self.timeout:
- raise
- time.sleep(0.01)
-
-
class KinesisStreamTests(PySparkStreamingTestCase):
def test_kinesis_stream_api(self):
@@ -1551,57 +1348,6 @@ def search_kafka_assembly_jar():
return jars[0]
-def search_flume_assembly_jar():
- SPARK_HOME = os.environ["SPARK_HOME"]
- flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
- jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
- if not jars:
- raise Exception(
- ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
- "You need to build Spark with "
- "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
- "'build/mvn package' before running this test.")
- elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
- "remove all but one") % (", ".join(jars)))
- else:
- return jars[0]
-
-
-def search_mqtt_assembly_jar():
- SPARK_HOME = os.environ["SPARK_HOME"]
- mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly")
- jars = search_jar(mqtt_assembly_dir, "spark-streaming-mqtt-assembly")
- if not jars:
- raise Exception(
- ("Failed to find Spark Streaming MQTT assembly jar in %s. " % mqtt_assembly_dir) +
- "You need to build Spark with "
- "'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or "
- "'build/mvn package' before running this test")
- elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT assembly JARs: %s; please "
- "remove all but one") % (", ".join(jars)))
- else:
- return jars[0]
-
-
-def search_mqtt_test_jar():
- SPARK_HOME = os.environ["SPARK_HOME"]
- mqtt_test_dir = os.path.join(SPARK_HOME, "external/mqtt")
- jars = glob.glob(
- os.path.join(mqtt_test_dir, "target/scala-*/spark-streaming-mqtt-test-*.jar"))
- if not jars:
- raise Exception(
- ("Failed to find Spark Streaming MQTT test jar in %s. " % mqtt_test_dir) +
- "You need to build Spark with "
- "'build/sbt assembly/assembly streaming-mqtt/test:assembly'")
- elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT test JARs: %s; please "
- "remove all but one") % (", ".join(jars)))
- else:
- return jars[0]
-
-
def search_kinesis_asl_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "external/kinesis-asl-assembly")
@@ -1622,24 +1368,18 @@ are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
if __name__ == "__main__":
from pyspark.streaming.tests import *
kafka_assembly_jar = search_kafka_assembly_jar()
- flume_assembly_jar = search_flume_assembly_jar()
- mqtt_assembly_jar = search_mqtt_assembly_jar()
- mqtt_test_jar = search_mqtt_test_jar()
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
if kinesis_asl_assembly_jar is None:
kinesis_jar_present = False
- jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
- mqtt_test_jar)
+ jars = kafka_assembly_jar
else:
kinesis_jar_present = True
- jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
- mqtt_test_jar, kinesis_asl_assembly_jar)
+ jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
- KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests,
- StreamingListenerTests]
+ KafkaStreamTests, StreamingListenerTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)