aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2015-06-17 22:52:47 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-17 22:52:47 -0700
commit9db73ec12412f6809030546cf69dcb32d2c8e0fe (patch)
treebe6fca2f42818b0e7c17de03a7d5590ec9875432 /sql
parent3b6107704fb946e9fcb8c1c9bc4ded1b88c571af (diff)
downloadspark-9db73ec12412f6809030546cf69dcb32d2c8e0fe.tar.gz
spark-9db73ec12412f6809030546cf69dcb32d2c8e0fe.tar.bz2
spark-9db73ec12412f6809030546cf69dcb32d2c8e0fe.zip
[SPARK-8381][SQL]reuse typeConvert when convert Seq[Row] to catalyst type
reuse-typeConvert when convert Seq[Row] to CatalystType Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #6831 from lianhuiwang/reuse-typeConvert and squashes the following commits: 1fec395 [Lianhui Wang] remove CatalystTypeConverters.convertToCatalyst 714462d [Lianhui Wang] add package[sql] 9d1fbf3 [Lianhui Wang] address JoshRosen's comments 768956f [Lianhui Wang] update scala style 4498c62 [Lianhui Wang] reuse typeConvert
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala4
5 files changed, 12 insertions, 22 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 6175456c58..620e8de83a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -336,16 +336,6 @@ object CatalystTypeConverters {
}
/**
- * Converts Scala objects to catalyst rows / types. This method is slow, and for batch
- * conversion you should be using converter produced by createToCatalystConverter.
- * Note: This is always called after schemaFor has been called.
- * This ordering is important for UDT registration.
- */
- def convertToCatalyst(scalaValue: Any, dataType: DataType): Any = {
- getConverterForType(dataType).toCatalyst(scalaValue)
- }
-
- /**
* Creates a converter function that will convert Scala objects to the specified Catalyst type.
* Typical use case would be converting a collection of rows that have the same schema. You will
* call this function once to get a converter, and apply it to every row.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index c2d739b529..b4b00f5584 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -258,7 +258,7 @@ class ScalaReflectionSuite extends SparkFunSuite {
val data = PrimitiveData(1, 1, 1, 1, 1, 1, true)
val convertedData = InternalRow(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true)
val dataType = schemaFor[PrimitiveData].dataType
- assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
+ assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData)
}
test("convert Option[Product] to catalyst") {
@@ -268,7 +268,7 @@ class ScalaReflectionSuite extends SparkFunSuite {
val dataType = schemaFor[OptionalData].dataType
val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
InternalRow(1, 1, 1, 1, 1, 1, true))
- assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
+ assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData)
}
test("infer schema from case class with multiple constructors") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 444916bbad..466258e76f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1029,10 +1029,10 @@ class DataFrame private[sql](
val elementTypes = schema.toAttributes.map { attr => (attr.dataType, attr.nullable) }
val names = schema.toAttributes.map(_.name)
+ val convert = CatalystTypeConverters.createToCatalystConverter(schema)
val rowFunction =
- f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema)
- .asInstanceOf[InternalRow]))
+ f.andThen(_.map(convert(_).asInstanceOf[InternalRow]))
val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr))
Generate(generator, join = true, outer = false,
@@ -1059,8 +1059,8 @@ class DataFrame private[sql](
val names = attributes.map(_.name)
def rowFunction(row: Row): TraversableOnce[InternalRow] = {
- f(row(0).asInstanceOf[A]).map(o =>
- InternalRow(CatalystTypeConverters.convertToCatalyst(o, dataType)))
+ val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
+ f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
}
val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9d1f89d6d7..6b605f7130 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -536,12 +536,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
Class.forName(className, true, Utils.getContextOrSparkClassLoader))
val extractors =
localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
-
+ val methodsToConverts = extractors.zip(attributeSeq).map { case (e, attr) =>
+ (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+ }
iter.map { row =>
new GenericRow(
- extractors.zip(attributeSeq).map { case (e, attr) =>
- CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType)
- }.toArray[Any]
+ methodsToConverts.map { case (e, convert) => convert(e.invoke(row)) }.toArray[Any]
) : InternalRow
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 653792ea2e..c9dfcea5d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -65,8 +65,8 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
protected override def doExecute(): RDD[InternalRow] = {
- val converted = sideEffectResult.map(r =>
- CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[InternalRow])
+ val convert = CatalystTypeConverters.createToCatalystConverter(schema)
+ val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow])
sqlContext.sparkContext.parallelize(converted, 1)
}
}