aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/flume.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/flume.py')
-rw-r--r--python/pyspark/streaming/flume.py40
1 files changed, 17 insertions, 23 deletions
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
index b1fff0a5c7..edd5886a85 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -55,17 +55,8 @@ class FlumeUtils(object):
:return: A DStream object
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
-
- try:
- helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
- .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
- helper = helperClass.newInstance()
- jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
- except Py4JJavaError as e:
- if 'ClassNotFoundException' in str(e.java_exception):
- FlumeUtils._printErrorMsg(ssc.sparkContext)
- raise e
-
+ helper = FlumeUtils._get_helper(ssc._sc)
+ jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
@staticmethod
@@ -95,18 +86,9 @@ class FlumeUtils(object):
for (host, port) in addresses:
hosts.append(host)
ports.append(port)
-
- try:
- helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
- helper = helperClass.newInstance()
- jstream = helper.createPollingStream(
- ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
- except Py4JJavaError as e:
- if 'ClassNotFoundException' in str(e.java_exception):
- FlumeUtils._printErrorMsg(ssc.sparkContext)
- raise e
-
+ helper = FlumeUtils._get_helper(ssc._sc)
+ jstream = helper.createPollingStream(
+ ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
@staticmethod
@@ -127,6 +109,18 @@ class FlumeUtils(object):
return stream.map(func)
@staticmethod
+ def _get_helper(sc):
+ try:
+ helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
+ return helperClass.newInstance()
+ except Py4JJavaError as e:
+ # TODO: use --jar once it also work on driver
+ if 'ClassNotFoundException' in str(e.java_exception):
+ FlumeUtils._printErrorMsg(sc)
+ raise
+
+ @staticmethod
def _printErrorMsg(sc):
print("""
________________________________________________________________________________________________