aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-09-23 16:43:21 -0700
committerReynold Xin <rxin@databricks.com>2015-09-23 16:43:21 -0700
commit9952217749118ae78fe794ca11e1c4a87a4ae8ba (patch)
treecf71cc84eb34acdeade45cc8be3642db4faa8d54 /sql
parent067afb4e9bb227f159bcbc2aafafce9693303ea9 (diff)
downloadspark-9952217749118ae78fe794ca11e1c4a87a4ae8ba.tar.gz
spark-9952217749118ae78fe794ca11e1c4a87a4ae8ba.tar.bz2
spark-9952217749118ae78fe794ca11e1c4a87a4ae8ba.zip
[SPARK-10731] [SQL] Delegate to Scala's DataFrame.take implementation in Python DataFrame.
Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take). This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion. Author: Reynold Xin <rxin@databricks.com> Closes #8876 from rxin/SPARK-10731.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala)14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala16
2 files changed, 20 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index c35c726bfc..d6aaf424a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -28,7 +28,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.{PythonRunner, PythonBroadcast, PythonRDD, SerDeUtil}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -118,6 +119,17 @@ object EvaluatePython {
def apply(udf: PythonUDF, child: LogicalPlan): EvaluatePython =
new EvaluatePython(udf, child, AttributeReference("pythonUDF", udf.dataType)())
+ def takeAndServe(df: DataFrame, n: Int): Int = {
+ registerPicklers()
+ // This is an annoying hack - we should refactor the code so executeCollect and executeTake
+ // returns InternalRow rather than Row.
+ val converter = CatalystTypeConverters.createToCatalystConverter(df.schema)
+ val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row =>
+ EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], df.schema)
+ })
+ PythonRDD.serveIterator(iter, s"serve-DataFrame")
+ }
+
/**
* Helper for converting from Catalyst type to java type suitable for Pyrolite.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
index 2fdd798b44..963e6030c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
@@ -39,22 +39,20 @@ private[sql] class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
override def pyUDT: String = "pyspark.sql.tests.ExamplePointUDT"
- override def serialize(obj: Any): Seq[Double] = {
+ override def serialize(obj: Any): GenericArrayData = {
obj match {
case p: ExamplePoint =>
- Seq(p.x, p.y)
+ val output = new Array[Any](2)
+ output(0) = p.x
+ output(1) = p.y
+ new GenericArrayData(output)
}
}
override def deserialize(datum: Any): ExamplePoint = {
datum match {
- case values: Seq[_] =>
- val xy = values.asInstanceOf[Seq[Double]]
- assert(xy.length == 2)
- new ExamplePoint(xy(0), xy(1))
- case values: util.ArrayList[_] =>
- val xy = values.asInstanceOf[util.ArrayList[Double]].asScala
- new ExamplePoint(xy(0), xy(1))
+ case values: ArrayData =>
+ new ExamplePoint(values.getDouble(0), values.getDouble(1))
}
}