aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-07-29 22:30:49 -0700
committerDavies Liu <davies.liu@gmail.com>2015-07-29 22:30:49 -0700
commite044705b4402f86d0557ecd146f3565388c7eeb4 (patch)
treef8db4937fe17a3c9fdb651f605df057aecf9d597 /sql
parentf5dd11339fc9a6d11350f63beeca7c14aec169b1 (diff)
downloadspark-e044705b4402f86d0557ecd146f3565388c7eeb4.tar.gz
spark-e044705b4402f86d0557ecd146f3565388c7eeb4.tar.bz2
spark-e044705b4402f86d0557ecd146f3565388c7eeb4.zip
[SPARK-9116] [SQL] [PYSPARK] support Python only UDT in __main__
Also we could create a Python UDT without having a Scala one, it's important for Python users. cc mengxr JoshRosen Author: Davies Liu <davies@databricks.com> Closes #7453 from davies/class_in_main and squashes the following commits: 4dfd5e1 [Davies Liu] add tests for Python and Scala UDT 793d9b2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into class_in_main dc65f19 [Davies Liu] address comment a9a3c40 [Davies Liu] Merge branch 'master' of github.com:apache/spark into class_in_main a86e1fc [Davies Liu] fix serialization ad528ba [Davies Liu] Merge branch 'master' of github.com:apache/spark into class_in_main 63f52ef [Davies Liu] fix pylint check 655b8a9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into class_in_main 316a394 [Davies Liu] support Python UDT with UTF 0bcb3ef [Davies Liu] fix bug in mllib de986d6 [Davies Liu] fix test 83d65ac [Davies Liu] fix bug in StructType 55bb86e [Davies Liu] support Python UDT in __main__ (without Scala one)
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala1
3 files changed, 38 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 591fb26e67..f4428c2e8b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -142,12 +142,21 @@ object DataType {
("type", JString("struct"))) =>
StructType(fields.map(parseStructField))
+ // Scala/Java UDT
case JSortedObject(
("class", JString(udtClass)),
("pyClass", _),
("sqlType", _),
("type", JString("udt"))) =>
Utils.classForName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]]
+
+ // Python UDT
+ case JSortedObject(
+ ("pyClass", JString(pyClass)),
+ ("serializedClass", JString(serialized)),
+ ("sqlType", v: JValue),
+ ("type", JString("udt"))) =>
+ new PythonUserDefinedType(parseDataType(v), pyClass, serialized)
}
private def parseStructField(json: JValue): StructField = json match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
index e47cfb4833..4305903616 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
@@ -45,6 +45,9 @@ abstract class UserDefinedType[UserType] extends DataType with Serializable {
/** Paired Python UDT class, if exists. */
def pyUDT: String = null
+ /** Serialized Python UDT class, if exists. */
+ def serializedPyClass: String = null
+
/**
* Convert the user type to a SQL datum
*
@@ -82,3 +85,29 @@ abstract class UserDefinedType[UserType] extends DataType with Serializable {
override private[sql] def acceptsType(dataType: DataType) =
this.getClass == dataType.getClass
}
+
+/**
+ * ::DeveloperApi::
+ * The user defined type in Python.
+ *
+ * Note: This can only be accessed via Python UDF, or accessed as serialized object.
+ */
+private[sql] class PythonUserDefinedType(
+ val sqlType: DataType,
+ override val pyUDT: String,
+ override val serializedPyClass: String) extends UserDefinedType[Any] {
+
+ /* The serialization is handled by UDT class in Python */
+ override def serialize(obj: Any): Any = obj
+ override def deserialize(datam: Any): Any = datam
+
+ /* There is no Java class for Python UDT */
+ override def userClass: java.lang.Class[Any] = null
+
+ override private[sql] def jsonValue: JValue = {
+ ("type" -> "udt") ~
+ ("pyClass" -> pyUDT) ~
+ ("serializedClass" -> serializedPyClass) ~
+ ("sqlType" -> sqlType.jsonValue)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala
index ec084a2996..3c38916fd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala
@@ -267,7 +267,6 @@ object EvaluatePython {
pickler.save(row.values(i))
i += 1
}
- row.values.foreach(pickler.save)
out.write(Opcodes.TUPLE)
out.write(Opcodes.REDUCE)
}