aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMariusz Strzelecki <mariusz.strzelecki@allegrogroup.com>2016-08-09 09:44:43 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-09 09:44:43 -0700
commit29081b587f3423bf5a3e0066357884d0c26a04bf (patch)
treeb86e35b297a8b3100a19dc0e85f828434524b96e
parent182e11904bf2093c2faa57894a1c4bb11d872596 (diff)
downloadspark-29081b587f3423bf5a3e0066357884d0c26a04bf.tar.gz
spark-29081b587f3423bf5a3e0066357884d0c26a04bf.tar.bz2
spark-29081b587f3423bf5a3e0066357884d0c26a04bf.zip
[SPARK-16950] [PYSPARK] fromOffsets parameter support in KafkaUtils.createDirectStream for python3
## What changes were proposed in this pull request? Ability to use KafkaUtils.createDirectStream with starting offsets in python 3 by using java.lang.Number instead of Long during param mapping in scala helper. This allows py4j to pass Integer or Long to the map and resolves ClassCastException problems. ## How was this patch tested? unit tests jerryshao - could you please look at this PR? Author: Mariusz Strzelecki <mariusz.strzelecki@allegrogroup.com> Closes #14540 from szczeles/kafka_pyspark.
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala8
-rw-r--r--python/pyspark/streaming/kafka.py3
-rw-r--r--python/pyspark/streaming/tests.py12
3 files changed, 10 insertions, 13 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index edaafb912c..b17e198077 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.kafka
import java.io.OutputStream
-import java.lang.{Integer => JInt, Long => JLong}
+import java.lang.{Integer => JInt, Long => JLong, Number => JNumber}
import java.nio.charset.StandardCharsets
import java.util.{List => JList, Map => JMap, Set => JSet}
@@ -682,7 +682,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = {
+ fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[(Array[Byte], Array[Byte])] = {
val messageHandler =
(mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler))
@@ -692,7 +692,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = {
+ fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[Array[Byte]] = {
val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler).
@@ -704,7 +704,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong],
+ fromOffsets: JMap[TopicAndPartition, JNumber],
messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = {
val currentFromOffsets = if (!fromOffsets.isEmpty) {
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 2c1a667fc8..bf27d8047a 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -287,6 +287,9 @@ class TopicAndPartition(object):
def __ne__(self, other):
return not self.__eq__(other)
+ def __hash__(self):
+ return (self._topic, self._partition).__hash__()
+
class Broker(object):
"""
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 360ba1e716..5ac007cd59 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -41,6 +41,9 @@ if sys.version_info[:2] <= (2, 6):
else:
import unittest
+if sys.version >= "3":
+ long = int
+
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.context import StreamingContext
@@ -1058,7 +1061,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream)
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_from_offset(self):
"""Test the Python direct Kafka stream API with start offset specified."""
topic = self._randomTopic()
@@ -1072,7 +1074,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
self._validateStreamResult(sendData, stream)
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd(self):
"""Test the Python direct Kafka RDD API."""
topic = self._randomTopic()
@@ -1085,7 +1086,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd_with_leaders(self):
"""Test the Python direct Kafka RDD API with leaders."""
topic = self._randomTopic()
@@ -1100,7 +1100,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd_get_offsetRanges(self):
"""Test Python direct Kafka RDD get OffsetRanges."""
topic = self._randomTopic()
@@ -1113,7 +1112,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self.assertEqual(offsetRanges, rdd.offsetRanges())
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_foreach_get_offsetRanges(self):
"""Test the Python direct Kafka stream foreachRDD get offsetRanges."""
topic = self._randomTopic()
@@ -1138,7 +1136,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_transform_get_offsetRanges(self):
"""Test the Python direct Kafka stream transform get offsetRanges."""
topic = self._randomTopic()
@@ -1176,7 +1173,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_transform_with_checkpoint(self):
"""Test the Python direct Kafka stream transform with checkpoint correctly recovered."""
topic = self._randomTopic()
@@ -1225,7 +1221,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
finally:
shutil.rmtree(tmpdir)
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd_message_handler(self):
"""Test Python direct Kafka RDD MessageHandler."""
topic = self._randomTopic()
@@ -1242,7 +1237,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
messageHandler=getKeyAndDoubleMessage)
self._validateRddResult({"aa": 1, "bb": 1, "cc": 2}, rdd)
- @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_message_handler(self):
"""Test the Python direct Kafka stream MessageHandler."""
topic = self._randomTopic()