diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-02-24 12:44:54 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-02-24 12:44:54 -0800 |
commit | a60f91284ceee64de13f04559ec19c13a820a133 (patch) | |
tree | 68d7d84620835d5e66cc3f94771a11655c4cbe2b /core/src | |
parent | f92f53faeea020d80638a06752d69ca7a949cdeb (diff) | |
download | spark-a60f91284ceee64de13f04559ec19c13a820a133.tar.gz spark-a60f91284ceee64de13f04559ec19c13a820a133.tar.bz2 spark-a60f91284ceee64de13f04559ec19c13a820a133.zip |
[SPARK-13467] [PYSPARK] abstract python function to simplify pyspark code
## What changes were proposed in this pull request?
When we pass a Python function to JVM side, we also need to send its context, e.g. `envVars`, `pythonIncludes`, `pythonExec`, etc. However, it's annoying to pass around so many parameters at many places. This PR abstract python function along with its context, to simplify some pyspark code and make the logic more clear.
## How was the this patch tested?
by existing unit tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #11342 from cloud-fan/python-clean.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 37 |
1 files changed, 22 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f12e2dfafa..05d1c31a08 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -42,14 +42,8 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} private[spark] class PythonRDD( parent: RDD[_], - command: Array[Byte], - envVars: JMap[String, String], - pythonIncludes: JList[String], - preservePartitoning: Boolean, - pythonExec: String, - pythonVer: String, - broadcastVars: JList[Broadcast[PythonBroadcast]], - accumulator: Accumulator[JList[Array[Byte]]]) + func: PythonFunction, + preservePartitoning: Boolean) extends RDD[Array[Byte]](parent) { val bufferSize = conf.getInt("spark.buffer.size", 65536) @@ -64,29 +58,37 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { - val runner = new PythonRunner( - command, envVars, pythonIncludes, pythonExec, pythonVer, broadcastVars, accumulator, - bufferSize, reuse_worker) + val runner = new PythonRunner(func, bufferSize, reuse_worker) runner.compute(firstParent.iterator(split, context), split.index, context) } } - /** - * A helper class to run Python UDFs in Spark. + * A wrapper for a Python function, contains all necessary context to run the function in Python + * runner. */ -private[spark] class PythonRunner( +private[spark] case class PythonFunction( command: Array[Byte], envVars: JMap[String, String], pythonIncludes: JList[String], pythonExec: String, pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], - accumulator: Accumulator[JList[Array[Byte]]], + accumulator: Accumulator[JList[Array[Byte]]]) + +/** + * A helper class to run Python UDFs in Spark. + */ +private[spark] class PythonRunner( + func: PythonFunction, bufferSize: Int, reuse_worker: Boolean) extends Logging { + private val envVars = func.envVars + private val pythonExec = func.pythonExec + private val accumulator = func.accumulator + def compute( inputIterator: Iterator[_], partitionIndex: Int, @@ -225,6 +227,11 @@ private[spark] class PythonRunner( @volatile private var _exception: Exception = null + private val pythonVer = func.pythonVer + private val pythonIncludes = func.pythonIncludes + private val broadcastVars = func.broadcastVars + private val command = func.command + setDaemon(true) /** Contains the exception thrown while writing the parent iterator to the Python process. */ |