diff options
author | Davies Liu <davies@databricks.com> | 2015-05-18 12:55:13 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-05-18 12:55:13 -0700 |
commit | 32fbd297dd651ba3ce4ce52aeb0488233149cdf9 (patch) | |
tree | c18970db228448c856f8ce9538d708511e9e95fd /sql | |
parent | 9dadf019b93038e1e18336ccd06c5eecb4bae32f (diff) | |
download | spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.gz spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.bz2 spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.zip |
[SPARK-6216] [PYSPARK] check python version of worker with driver
This PR revert #5404, change to pass the version of python in driver into JVM, check it in worker before deserializing closure, then it can works with different major version of Python.
Author: Davies Liu <davies@databricks.com>
Closes #6203 from davies/py_version and squashes the following commits:
b8fb76e [Davies Liu] fix test
6ce5096 [Davies Liu] use string for version
47c6278 [Davies Liu] check python version of worker with driver
Diffstat (limited to 'sql')
3 files changed, 7 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index dc3389c41b..3cc5c2441d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -46,6 +46,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { envVars: JMap[String, String], pythonIncludes: JList[String], pythonExec: String, + pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], accumulator: Accumulator[JList[Array[Byte]]], stringDataType: String): Unit = { @@ -70,6 +71,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { envVars, pythonIncludes, pythonExec, + pythonVer, broadcastVars, accumulator, dataType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala index 505ab1301e..a02e202d2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala @@ -58,14 +58,15 @@ private[sql] case class UserDefinedPythonFunction( envVars: JMap[String, String], pythonIncludes: JList[String], pythonExec: String, + pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], accumulator: Accumulator[JList[Array[Byte]]], dataType: DataType) { /** Returns a [[Column]] that will evaluate to calling this UDF with the given input. */ def apply(exprs: Column*): Column = { - val udf = PythonUDF(name, command, envVars, pythonIncludes, pythonExec, broadcastVars, - accumulator, dataType, exprs.map(_.expr)) + val udf = PythonUDF(name, command, envVars, pythonIncludes, pythonExec, pythonVer, + broadcastVars, accumulator, dataType, exprs.map(_.expr)) Column(udf) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 65dd7ba020..11b2897f76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -46,6 +46,7 @@ private[spark] case class PythonUDF( envVars: JMap[String, String], pythonIncludes: JList[String], pythonExec: String, + pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], accumulator: Accumulator[JList[Array[Byte]]], dataType: DataType, @@ -251,6 +252,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: udf.pythonIncludes, false, udf.pythonExec, + udf.pythonVer, udf.broadcastVars, udf.accumulator ).mapPartitions { iter => |