aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-04-09 23:14:24 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-09 23:14:24 -0700
commit3290d2d13bb4bd875aec14425c8e3766f9cc644b (patch)
tree7e862fb443bd1bca2b15c3fd1a1ec9b9d27912d5 /python
parente2360810f50de77f79d372cc9b46db117d451cfc (diff)
downloadspark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.tar.gz
spark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.tar.bz2
spark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.zip
[SPARK-6211][Streaming] Add Python Kafka API unit test
Refactor the Kafka unit test and add Python API support. CC tdas davies please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #4961 from jerryshao/SPARK-6211 and squashes the following commits: ee4b919 [jerryshao] Fixed newly merged issue 82c756e [jerryshao] Address the comments 92912d1 [jerryshao] Address the commits 0708bb1 [jerryshao] Fix rebase issue 40b47a3 [Saisai Shao] Style fix f889657 [Saisai Shao] Update the code according 8a2f3e2 [jerryshao] Address the issues 0f1b7ce [jerryshao] Still fix the bug 61a04f0 [jerryshao] Fix bugs and address the issues 64d9877 [jerryshao] Fix rebase bugs 8ad442f [jerryshao] Add kafka-assembly in run-tests 6020b00 [jerryshao] Add more debug info in Shell 8102d6e [jerryshao] Fix bug in Jenkins test fde1213 [jerryshao] Code style changes 5536f95 [jerryshao] Refactor the Kafka unit test and add Python Kafka unittest support
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/tests.py43
-rwxr-xr-xpython/run-tests19
2 files changed, 60 insertions, 2 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 608f8e2647..9b4635e490 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -23,13 +23,16 @@ import unittest
import tempfile
import struct
+from py4j.java_collections import MapConverter
+
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
+from pyspark.streaming.kafka import KafkaUtils
class PySparkStreamingTestCase(unittest.TestCase):
- timeout = 10 # seconds
+ timeout = 20 # seconds
duration = 1
def setUp(self):
@@ -556,5 +559,43 @@ class CheckpointTests(PySparkStreamingTestCase):
check_output(3)
+class KafkaStreamTests(PySparkStreamingTestCase):
+
+ def setUp(self):
+ super(KafkaStreamTests, self).setUp()
+
+ kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
+ self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
+ self._kafkaTestUtils.setup()
+
+ def tearDown(self):
+ if self._kafkaTestUtils is not None:
+ self._kafkaTestUtils.teardown()
+ self._kafkaTestUtils = None
+
+ super(KafkaStreamTests, self).tearDown()
+
+ def test_kafka_stream(self):
+ """Test the Python Kafka stream API."""
+ topic = "topic1"
+ sendData = {"a": 3, "b": 5, "c": 10}
+ jSendData = MapConverter().convert(sendData,
+ self.ssc.sparkContext._gateway._gateway_client)
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, jSendData)
+
+ stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
+ "test-streaming-consumer", {topic: 1},
+ {"auto.offset.reset": "smallest"})
+
+ result = {}
+ for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
+ sum(sendData.values()))):
+ result[i] = result.get(i, 0) + 1
+
+ self.assertEqual(sendData, result)
+
if __name__ == "__main__":
unittest.main()
diff --git a/python/run-tests b/python/run-tests
index f569a56fb7..f3a07d8aba 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -21,6 +21,8 @@
# Figure out where the Spark framework is installed
FWDIR="$(cd "`dirname "$0"`"; cd ../; pwd)"
+. "$FWDIR"/bin/load-spark-env.sh
+
# CD into the python directory to find things on the right path
cd "$FWDIR/python"
@@ -57,7 +59,7 @@ function run_core_tests() {
PYSPARK_DOC_TEST=1 run_test "pyspark/broadcast.py"
PYSPARK_DOC_TEST=1 run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
- run_test "pyspark/profiler.py"
+ run_test "pyspark/profiler.py"
run_test "pyspark/shuffle.py"
run_test "pyspark/tests.py"
}
@@ -97,6 +99,21 @@ function run_ml_tests() {
function run_streaming_tests() {
echo "Run streaming tests ..."
+
+ KAFKA_ASSEMBLY_DIR="$FWDIR"/external/kafka-assembly
+ JAR_PATH="${KAFKA_ASSEMBLY_DIR}/target/scala-${SPARK_SCALA_VERSION}"
+ for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do
+ if [[ ! -e "$f" ]]; then
+ echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2
+ echo "You need to build Spark with " \
+ "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or" \
+ "'build/mvn package' before running this program" 1>&2
+ exit 1
+ fi
+ KAFKA_ASSEMBLY_JAR="$f"
+ done
+
+ export PYSPARK_SUBMIT_ARGS="--jars ${KAFKA_ASSEMBLY_JAR} pyspark-shell"
run_test "pyspark/streaming/util.py"
run_test "pyspark/streaming/tests.py"
}