diff options
author | Reynold Xin <rxin@databricks.com> | 2015-09-23 16:43:21 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-09-23 16:43:21 -0700 |
commit | 9952217749118ae78fe794ca11e1c4a87a4ae8ba (patch) | |
tree | cf71cc84eb34acdeade45cc8be3642db4faa8d54 /sql | |
parent | 067afb4e9bb227f159bcbc2aafafce9693303ea9 (diff) | |
download | spark-9952217749118ae78fe794ca11e1c4a87a4ae8ba.tar.gz spark-9952217749118ae78fe794ca11e1c4a87a4ae8ba.tar.bz2 spark-9952217749118ae78fe794ca11e1c4a87a4ae8ba.zip |
[SPARK-10731] [SQL] Delegate to Scala's DataFrame.take implementation in Python DataFrame.
Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take).
This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion.
Author: Reynold Xin <rxin@databricks.com>
Closes #8876 from rxin/SPARK-10731.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala) | 14 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala | 16 |
2 files changed, 20 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala index c35c726bfc..d6aaf424a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala @@ -28,7 +28,8 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.{PythonRunner, PythonBroadcast, PythonRDD, SerDeUtil} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -118,6 +119,17 @@ object EvaluatePython { def apply(udf: PythonUDF, child: LogicalPlan): EvaluatePython = new EvaluatePython(udf, child, AttributeReference("pythonUDF", udf.dataType)()) + def takeAndServe(df: DataFrame, n: Int): Int = { + registerPicklers() + // This is an annoying hack - we should refactor the code so executeCollect and executeTake + // returns InternalRow rather than Row. + val converter = CatalystTypeConverters.createToCatalystConverter(df.schema) + val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row => + EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], df.schema) + }) + PythonRDD.serveIterator(iter, s"serve-DataFrame") + } + /** * Helper for converting from Catalyst type to java type suitable for Pyrolite. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala index 2fdd798b44..963e6030c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala @@ -39,22 +39,20 @@ private[sql] class ExamplePointUDT extends UserDefinedType[ExamplePoint] { override def pyUDT: String = "pyspark.sql.tests.ExamplePointUDT" - override def serialize(obj: Any): Seq[Double] = { + override def serialize(obj: Any): GenericArrayData = { obj match { case p: ExamplePoint => - Seq(p.x, p.y) + val output = new Array[Any](2) + output(0) = p.x + output(1) = p.y + new GenericArrayData(output) } } override def deserialize(datum: Any): ExamplePoint = { datum match { - case values: Seq[_] => - val xy = values.asInstanceOf[Seq[Double]] - assert(xy.length == 2) - new ExamplePoint(xy(0), xy(1)) - case values: util.ArrayList[_] => - val xy = values.asInstanceOf[util.ArrayList[Double]].asScala - new ExamplePoint(xy(0), xy(1)) + case values: ArrayData => + new ExamplePoint(values.getDouble(0), values.getDouble(1)) } } |