aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorVolodymyr Lyubinets <vlyubin@gmail.com>2015-04-10 16:27:56 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-10 16:27:56 -0700
commit67d06880e47e0324409cf7e5b21db1dcb0107b82 (patch)
tree4e59b5974a5f9b51919fec87f60a9f71d6f02234 /sql/core
parent23d5f8864f7d665a74b1d38118700139854dbb1c (diff)
downloadspark-67d06880e47e0324409cf7e5b21db1dcb0107b82.tar.gz
spark-67d06880e47e0324409cf7e5b21db1dcb0107b82.tar.bz2
spark-67d06880e47e0324409cf7e5b21db1dcb0107b82.zip
[SQL] [SPARK-6620] Speed up toDF() and rdd() functions by constructing converters in ScalaReflection
cc marmbrus Author: Volodymyr Lyubinets <vlyubin@gmail.com> Closes #5279 from vlyubin/speedup and squashes the following commits: e75a387 [Volodymyr Lyubinets] Changes to ScalaUDF 11a20ec [Volodymyr Lyubinets] Avoid creating a tuple c327bc9 [Volodymyr Lyubinets] Moved the only remaining function from DataTypeConversions to DateUtils dec6802 [Volodymyr Lyubinets] Addresed review feedback 74301fa [Volodymyr Lyubinets] Addressed review comments afa3aa5 [Volodymyr Lyubinets] Minor refactoring, added license, removed debug output 881dc60 [Volodymyr Lyubinets] Moved to a separate module; addressed review comments; one extra place of usage; changed behaviour for Java 8cad6e2 [Volodymyr Lyubinets] Addressed review commments 41b2aa9 [Volodymyr Lyubinets] Creating converters for ScalaReflection stuff, and more
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala4
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala3
9 files changed, 55 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5c6016a4a2..9b9adf8550 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.sql.catalyst.{ScalaReflection, SqlParser}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, ResolvedStar}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
@@ -713,7 +713,7 @@ class DataFrame private[sql](
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributes = schema.toAttributes
val rowFunction =
- f.andThen(_.map(ScalaReflection.convertToCatalyst(_, schema).asInstanceOf[Row]))
+ f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema).asInstanceOf[Row]))
val generator = UserDefinedGenerator(attributes, rowFunction, input.map(_.expr))
Generate(generator, join = true, outer = false, None, logicalPlan)
@@ -734,7 +734,7 @@ class DataFrame private[sql](
val dataType = ScalaReflection.schemaFor[B].dataType
val attributes = AttributeReference(outputColumn, dataType)() :: Nil
def rowFunction(row: Row): TraversableOnce[Row] = {
- f(row(0).asInstanceOf[A]).map(o => Row(ScalaReflection.convertToCatalyst(o, dataType)))
+ f(row(0).asInstanceOf[A]).map(o => Row(CatalystTypeConverters.convertToCatalyst(o, dataType)))
}
val generator = UserDefinedGenerator(attributes, rowFunction, apply(inputColumn).expr :: Nil)
@@ -961,7 +961,10 @@ class DataFrame private[sql](
lazy val rdd: RDD[Row] = {
// use a local variable to make sure the map closure doesn't capture the whole DataFrame
val schema = this.schema
- queryExecution.executedPlan.execute().map(ScalaReflection.convertRowToScala(_, schema))
+ queryExecution.executedPlan.execute().mapPartitions { rows =>
+ val converter = CatalystTypeConverters.createToScalaConverter(schema)
+ rows.map(converter(_).asInstanceOf[Row])
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 39dd14e796..c25ef58e6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,9 +31,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.catalyst.{ScalaReflection, expressions}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions}
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
@@ -404,7 +404,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val catalystRows = if (needsConversion) {
- rowRDD.map(ScalaReflection.convertToCatalyst(_, schema).asInstanceOf[Row])
+ val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+ rowRDD.map(converter(_).asInstanceOf[Row])
} else {
rowRDD
}
@@ -459,7 +460,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
iter.map { row =>
new GenericRow(
extractors.zip(attributeSeq).map { case (e, attr) =>
- DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
+ CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType)
}.toArray[Any]
) : Row
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index d8955725e5..656bdd7212 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -20,14 +20,12 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.types.StructType
-import scala.collection.immutable
-
/**
* :: DeveloperApi ::
*/
@@ -39,13 +37,15 @@ object RDDConversions {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
- val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
+ val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType))
val schemaFields = schema.fields.toArray
+ val converters = schemaFields.map {
+ f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
+ }
bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
- mutableRow(i) =
- ScalaReflection.convertToCatalyst(r.productElement(i), schemaFields(i).dataType)
+ mutableRow(i) = converters(i)(r.productElement(i))
i += 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 5bd699a2fa..8a8c3a4043 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -32,9 +32,15 @@ case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) extends LeafNo
override def execute(): RDD[Row] = rdd
- override def executeCollect(): Array[Row] =
- rows.map(ScalaReflection.convertRowToScala(_, schema)).toArray
- override def executeTake(limit: Int): Array[Row] =
- rows.map(ScalaReflection.convertRowToScala(_, schema)).take(limit).toArray
+ override def executeCollect(): Array[Row] = {
+ val converter = CatalystTypeConverters.createToScalaConverter(schema)
+ rows.map(converter(_).asInstanceOf[Row]).toArray
+ }
+
+
+ override def executeTake(limit: Int): Array[Row] = {
+ val converter = CatalystTypeConverters.createToScalaConverter(schema)
+ rows.map(converter(_).asInstanceOf[Row]).take(limit).toArray
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index d239637cd4..fabcf6b4a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.{ScalaReflection, trees}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -80,8 +80,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Runs this query returning the result as an array.
*/
+
def executeCollect(): Array[Row] = {
- execute().map(ScalaReflection.convertRowToScala(_, schema)).collect()
+ execute().mapPartitions { iter =>
+ val converter = CatalystTypeConverters.createToScalaConverter(schema)
+ iter.map(converter(_).asInstanceOf[Row])
+ }.collect()
}
/**
@@ -125,7 +129,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
partsScanned += numPartsToTry
}
- buf.toArray.map(ScalaReflection.convertRowToScala(_, this.schema))
+ val converter = CatalystTypeConverters.createToScalaConverter(schema)
+ buf.toArray.map(converter(_).asInstanceOf[Row])
}
protected def newProjection(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 1f5251a203..6eec520abf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
@@ -139,9 +139,10 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
private def collectData(): Array[Row] = child.execute().map(_.copy()).takeOrdered(limit)(ord)
- // TODO: Is this copying for no reason?
- override def executeCollect(): Array[Row] =
- collectData().map(ScalaReflection.convertRowToScala(_, this.schema))
+ override def executeCollect(): Array[Row] = {
+ val converter = CatalystTypeConverters.createToScalaConverter(schema)
+ collectData().map(converter(_).asInstanceOf[Row])
+ }
// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 0b770f2251..b1e8521383 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -391,7 +391,7 @@ private[sql] object JsonRDD extends Logging {
value match {
// only support string as date
case value: java.lang.String =>
- DateUtils.millisToDays(DataTypeConversions.stringToTime(value).getTime)
+ DateUtils.millisToDays(DateUtils.stringToTime(value).getTime)
case value: java.sql.Date => DateUtils.fromJavaDate(value)
}
}
@@ -400,7 +400,7 @@ private[sql] object JsonRDD extends Logging {
value match {
case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong)
case value: java.lang.Long => new Timestamp(value)
- case value: java.lang.String => toTimestamp(DataTypeConversions.stringToTime(value).getTime)
+ case value: java.lang.String => toTimestamp(DateUtils.stringToTime(value).getTime)
}
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 1ff2d5a190..6d0fbe83c2 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -20,6 +20,8 @@ package test.org.apache.spark.sql;
import java.io.Serializable;
import java.util.Arrays;
+import scala.collection.Seq;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -127,6 +129,12 @@ public class JavaDataFrameSuite {
schema.apply("b"));
Row first = df.select("a", "b").first();
Assert.assertEquals(bean.getA(), first.getDouble(0), 0.0);
- Assert.assertArrayEquals(bean.getB(), first.<Integer[]>getAs(1));
+ // Now Java lists and maps are converetd to Scala Seq's and Map's. Once we get a Seq below,
+ // verify that it has the expected length, and contains expected elements.
+ Seq<Integer> result = first.getAs(1);
+ Assert.assertEquals(bean.getB().length, result.length());
+ for (int i = 0; i < result.length(); i++) {
+ Assert.assertEquals(bean.getB()[i], result.apply(i));
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 1fe0b76c00..fd0e2746dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -895,8 +895,7 @@ class JsonSuite extends QueryTest {
)
}
- test("SPARK-4228 DataFrame to JSON")
- {
+ test("SPARK-4228 DataFrame to JSON") {
val schema1 = StructType(
StructField("f1", IntegerType, false) ::
StructField("f2", StringType, false) ::