aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-18 12:55:13 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-18 12:55:13 -0700
commit32fbd297dd651ba3ce4ce52aeb0488233149cdf9 (patch)
treec18970db228448c856f8ce9538d708511e9e95fd /sql
parent9dadf019b93038e1e18336ccd06c5eecb4bae32f (diff)
downloadspark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.gz
spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.bz2
spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.zip
[SPARK-6216] [PYSPARK] check python version of worker with driver
This PR revert #5404, change to pass the version of python in driver into JVM, check it in worker before deserializing closure, then it can works with different major version of Python. Author: Davies Liu <davies@databricks.com> Closes #6203 from davies/py_version and squashes the following commits: b8fb76e [Davies Liu] fix test 6ce5096 [Davies Liu] use string for version 47c6278 [Davies Liu] check python version of worker with driver
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala2
3 files changed, 7 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index dc3389c41b..3cc5c2441d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -46,6 +46,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
+ pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]],
stringDataType: String): Unit = {
@@ -70,6 +71,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
envVars,
pythonIncludes,
pythonExec,
+ pythonVer,
broadcastVars,
accumulator,
dataType,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
index 505ab1301e..a02e202d2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
@@ -58,14 +58,15 @@ private[sql] case class UserDefinedPythonFunction(
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
+ pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]],
dataType: DataType) {
/** Returns a [[Column]] that will evaluate to calling this UDF with the given input. */
def apply(exprs: Column*): Column = {
- val udf = PythonUDF(name, command, envVars, pythonIncludes, pythonExec, broadcastVars,
- accumulator, dataType, exprs.map(_.expr))
+ val udf = PythonUDF(name, command, envVars, pythonIncludes, pythonExec, pythonVer,
+ broadcastVars, accumulator, dataType, exprs.map(_.expr))
Column(udf)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 65dd7ba020..11b2897f76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -46,6 +46,7 @@ private[spark] case class PythonUDF(
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
+ pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]],
dataType: DataType,
@@ -251,6 +252,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
udf.pythonIncludes,
false,
udf.pythonExec,
+ udf.pythonVer,
udf.broadcastVars,
udf.accumulator
).mapPartitions { iter =>