aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala29
1 files changed, 10 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index da28ec4f53..3b05e29e52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -35,26 +35,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, Generic
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
-/**
- * Evaluates a [[PythonUDF]], appending the result to the end of the input tuple.
- */
-case class EvaluatePython(
- udf: PythonUDF,
- child: LogicalPlan,
- resultAttribute: AttributeReference)
- extends logical.UnaryNode {
-
- def output: Seq[Attribute] = child.output :+ resultAttribute
-
- // References should not include the produced attribute.
- override def references: AttributeSet = udf.references
-}
-
-
object EvaluatePython {
- def apply(udf: PythonUDF, child: LogicalPlan): EvaluatePython =
- new EvaluatePython(udf, child, AttributeReference("pythonUDF", udf.dataType)())
-
def takeAndServe(df: DataFrame, n: Int): Int = {
registerPicklers()
df.withNewExecutionId {
@@ -66,6 +47,16 @@ object EvaluatePython {
}
}
+ def needConversionInPython(dt: DataType): Boolean = dt match {
+ case DateType | TimestampType => true
+ case _: StructType => true
+ case _: UserDefinedType[_] => true
+ case ArrayType(elementType, _) => needConversionInPython(elementType)
+ case MapType(keyType, valueType, _) =>
+ needConversionInPython(keyType) || needConversionInPython(valueType)
+ case _ => false
+ }
+
/**
* Helper for converting from Catalyst type to java type suitable for Pyrolite.
*/