aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-02-24 12:44:54 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-24 12:44:54 -0800
commita60f91284ceee64de13f04559ec19c13a820a133 (patch)
tree68d7d84620835d5e66cc3f94771a11655c4cbe2b /core
parentf92f53faeea020d80638a06752d69ca7a949cdeb (diff)
downloadspark-a60f91284ceee64de13f04559ec19c13a820a133.tar.gz
spark-a60f91284ceee64de13f04559ec19c13a820a133.tar.bz2
spark-a60f91284ceee64de13f04559ec19c13a820a133.zip
[SPARK-13467] [PYSPARK] abstract python function to simplify pyspark code
## What changes were proposed in this pull request? When we pass a Python function to JVM side, we also need to send its context, e.g. `envVars`, `pythonIncludes`, `pythonExec`, etc. However, it's annoying to pass around so many parameters at many places. This PR abstract python function along with its context, to simplify some pyspark code and make the logic more clear. ## How was the this patch tested? by existing unit tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #11342 from cloud-fan/python-clean.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala37
1 files changed, 22 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f12e2dfafa..05d1c31a08 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -42,14 +42,8 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
private[spark] class PythonRDD(
parent: RDD[_],
- command: Array[Byte],
- envVars: JMap[String, String],
- pythonIncludes: JList[String],
- preservePartitoning: Boolean,
- pythonExec: String,
- pythonVer: String,
- broadcastVars: JList[Broadcast[PythonBroadcast]],
- accumulator: Accumulator[JList[Array[Byte]]])
+ func: PythonFunction,
+ preservePartitoning: Boolean)
extends RDD[Array[Byte]](parent) {
val bufferSize = conf.getInt("spark.buffer.size", 65536)
@@ -64,29 +58,37 @@ private[spark] class PythonRDD(
val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
- val runner = new PythonRunner(
- command, envVars, pythonIncludes, pythonExec, pythonVer, broadcastVars, accumulator,
- bufferSize, reuse_worker)
+ val runner = new PythonRunner(func, bufferSize, reuse_worker)
runner.compute(firstParent.iterator(split, context), split.index, context)
}
}
-
/**
- * A helper class to run Python UDFs in Spark.
+ * A wrapper for a Python function, contains all necessary context to run the function in Python
+ * runner.
*/
-private[spark] class PythonRunner(
+private[spark] case class PythonFunction(
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
- accumulator: Accumulator[JList[Array[Byte]]],
+ accumulator: Accumulator[JList[Array[Byte]]])
+
+/**
+ * A helper class to run Python UDFs in Spark.
+ */
+private[spark] class PythonRunner(
+ func: PythonFunction,
bufferSize: Int,
reuse_worker: Boolean)
extends Logging {
+ private val envVars = func.envVars
+ private val pythonExec = func.pythonExec
+ private val accumulator = func.accumulator
+
def compute(
inputIterator: Iterator[_],
partitionIndex: Int,
@@ -225,6 +227,11 @@ private[spark] class PythonRunner(
@volatile private var _exception: Exception = null
+ private val pythonVer = func.pythonVer
+ private val pythonIncludes = func.pythonIncludes
+ private val broadcastVars = func.broadcastVars
+ private val command = func.command
+
setDaemon(true)
/** Contains the exception thrown while writing the parent iterator to the Python process. */