diff options
author | jerryshao <saisai.shao@intel.com> | 2015-04-09 23:14:24 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-04-09 23:14:24 -0700 |
commit | 3290d2d13bb4bd875aec14425c8e3766f9cc644b (patch) | |
tree | 7e862fb443bd1bca2b15c3fd1a1ec9b9d27912d5 /python | |
parent | e2360810f50de77f79d372cc9b46db117d451cfc (diff) | |
download | spark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.tar.gz spark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.tar.bz2 spark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.zip |
[SPARK-6211][Streaming] Add Python Kafka API unit test
Refactor the Kafka unit test and add Python API support. CC tdas davies please help to review, thanks a lot.
Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>
Closes #4961 from jerryshao/SPARK-6211 and squashes the following commits:
ee4b919 [jerryshao] Fixed newly merged issue
82c756e [jerryshao] Address the comments
92912d1 [jerryshao] Address the commits
0708bb1 [jerryshao] Fix rebase issue
40b47a3 [Saisai Shao] Style fix
f889657 [Saisai Shao] Update the code according
8a2f3e2 [jerryshao] Address the issues
0f1b7ce [jerryshao] Still fix the bug
61a04f0 [jerryshao] Fix bugs and address the issues
64d9877 [jerryshao] Fix rebase bugs
8ad442f [jerryshao] Add kafka-assembly in run-tests
6020b00 [jerryshao] Add more debug info in Shell
8102d6e [jerryshao] Fix bug in Jenkins test
fde1213 [jerryshao] Code style changes
5536f95 [jerryshao] Refactor the Kafka unit test and add Python Kafka unittest support
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/streaming/tests.py | 43 | ||||
-rwxr-xr-x | python/run-tests | 19 |
2 files changed, 60 insertions, 2 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 608f8e2647..9b4635e490 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -23,13 +23,16 @@ import unittest import tempfile import struct +from py4j.java_collections import MapConverter + from pyspark.context import SparkConf, SparkContext, RDD from pyspark.streaming.context import StreamingContext +from pyspark.streaming.kafka import KafkaUtils class PySparkStreamingTestCase(unittest.TestCase): - timeout = 10 # seconds + timeout = 20 # seconds duration = 1 def setUp(self): @@ -556,5 +559,43 @@ class CheckpointTests(PySparkStreamingTestCase): check_output(3) +class KafkaStreamTests(PySparkStreamingTestCase): + + def setUp(self): + super(KafkaStreamTests, self).setUp() + + kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ + .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils") + self._kafkaTestUtils = kafkaTestUtilsClz.newInstance() + self._kafkaTestUtils.setup() + + def tearDown(self): + if self._kafkaTestUtils is not None: + self._kafkaTestUtils.teardown() + self._kafkaTestUtils = None + + super(KafkaStreamTests, self).tearDown() + + def test_kafka_stream(self): + """Test the Python Kafka stream API.""" + topic = "topic1" + sendData = {"a": 3, "b": 5, "c": 10} + jSendData = MapConverter().convert(sendData, + self.ssc.sparkContext._gateway._gateway_client) + + self._kafkaTestUtils.createTopic(topic) + self._kafkaTestUtils.sendMessages(topic, jSendData) + + stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), + "test-streaming-consumer", {topic: 1}, + {"auto.offset.reset": "smallest"}) + + result = {} + for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]), + sum(sendData.values()))): + result[i] = result.get(i, 0) + 1 + + self.assertEqual(sendData, result) + if __name__ == "__main__": unittest.main() diff --git a/python/run-tests b/python/run-tests index f569a56fb7..f3a07d8aba 100755 --- a/python/run-tests +++ b/python/run-tests @@ -21,6 +21,8 @@ # Figure out where the Spark framework is installed FWDIR="$(cd "`dirname "$0"`"; cd ../; pwd)" +. "$FWDIR"/bin/load-spark-env.sh + # CD into the python directory to find things on the right path cd "$FWDIR/python" @@ -57,7 +59,7 @@ function run_core_tests() { PYSPARK_DOC_TEST=1 run_test "pyspark/broadcast.py" PYSPARK_DOC_TEST=1 run_test "pyspark/accumulators.py" run_test "pyspark/serializers.py" - run_test "pyspark/profiler.py" + run_test "pyspark/profiler.py" run_test "pyspark/shuffle.py" run_test "pyspark/tests.py" } @@ -97,6 +99,21 @@ function run_ml_tests() { function run_streaming_tests() { echo "Run streaming tests ..." + + KAFKA_ASSEMBLY_DIR="$FWDIR"/external/kafka-assembly + JAR_PATH="${KAFKA_ASSEMBLY_DIR}/target/scala-${SPARK_SCALA_VERSION}" + for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do + if [[ ! -e "$f" ]]; then + echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2 + echo "You need to build Spark with " \ + "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or" \ + "'build/mvn package' before running this program" 1>&2 + exit 1 + fi + KAFKA_ASSEMBLY_JAR="$f" + done + + export PYSPARK_SUBMIT_ARGS="--jars ${KAFKA_ASSEMBLY_JAR} pyspark-shell" run_test "pyspark/streaming/util.py" run_test "pyspark/streaming/tests.py" } |