diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala | 29 |
1 files changed, 10 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index da28ec4f53..3b05e29e52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -35,26 +35,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, Generic import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -/** - * Evaluates a [[PythonUDF]], appending the result to the end of the input tuple. - */ -case class EvaluatePython( - udf: PythonUDF, - child: LogicalPlan, - resultAttribute: AttributeReference) - extends logical.UnaryNode { - - def output: Seq[Attribute] = child.output :+ resultAttribute - - // References should not include the produced attribute. - override def references: AttributeSet = udf.references -} - - object EvaluatePython { - def apply(udf: PythonUDF, child: LogicalPlan): EvaluatePython = - new EvaluatePython(udf, child, AttributeReference("pythonUDF", udf.dataType)()) - def takeAndServe(df: DataFrame, n: Int): Int = { registerPicklers() df.withNewExecutionId { @@ -66,6 +47,16 @@ object EvaluatePython { } } + def needConversionInPython(dt: DataType): Boolean = dt match { + case DateType | TimestampType => true + case _: StructType => true + case _: UserDefinedType[_] => true + case ArrayType(elementType, _) => needConversionInPython(elementType) + case MapType(keyType, valueType, _) => + needConversionInPython(keyType) || needConversionInPython(valueType) + case _ => false + } + /** * Helper for converting from Catalyst type to java type suitable for Pyrolite. */ |