aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala13
2 files changed, 7 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3b3cb82078..9cfbdffd02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
-import org.apache.spark.api.python.PythonRDD
+import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
@@ -2567,8 +2567,12 @@ class Dataset[T] private[sql](
}
private[sql] def collectToPython(): Int = {
+ EvaluatePython.registerPicklers()
withNewExecutionId {
- PythonRDD.collectAndServe(javaToPython.rdd)
+ val toJava: (Any) => Any = EvaluatePython.toJava(_, schema)
+ val iter = new SerDeUtil.AutoBatchedPickler(
+ queryExecution.executedPlan.executeCollect().iterator.map(toJava))
+ PythonRDD.serveIterator(iter, "serve-DataFrame")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index cf68ed4ec3..724025b464 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -24,9 +24,8 @@ import scala.collection.JavaConverters._
import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
-import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
+import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
@@ -34,16 +33,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
object EvaluatePython {
- def takeAndServe(df: DataFrame, n: Int): Int = {
- registerPicklers()
- df.withNewExecutionId {
- val iter = new SerDeUtil.AutoBatchedPickler(
- df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
- EvaluatePython.toJava(row, df.schema)
- })
- PythonRDD.serveIterator(iter, s"serve-DataFrame")
- }
- }
def needConversionInPython(dt: DataType): Boolean = dt match {
case DateType | TimestampType => true