aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-13 16:16:50 +0800
committerCheng Lian <lian@databricks.com>2015-08-13 16:16:50 +0800
commit69930310115501f0de094fe6f5c6c60dade342bd (patch)
tree88f24cb0135e0b8ef201f268acf79b2815f914db /sql/catalyst
parent84a27916a62980c8fcb0977c3a7fdb73c0bd5812 (diff)
downloadspark-69930310115501f0de094fe6f5c6c60dade342bd.tar.gz
spark-69930310115501f0de094fe6f5c6c60dade342bd.tar.bz2
spark-69930310115501f0de094fe6f5c6c60dade342bd.zip
[SPARK-9757] [SQL] Fixes persistence of Parquet relation with decimal column
PR #7967 enables us to save data source relations to metastore in Hive compatible format when possible. But it fails to persist Parquet relations with decimal column(s) to Hive metastore of versions lower than 1.2.0. This is because `ParquetHiveSerDe` in Hive versions prior to 1.2.0 doesn't support decimal. This PR checks for this case and falls back to Spark SQL specific metastore table format. Author: Yin Huai <yhuai@databricks.com> Author: Cheng Lian <lian@databricks.com> Closes #8130 from liancheng/spark-9757/old-hive-parquet-decimal.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala24
5 files changed, 45 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
index 5094058164..5770f59b53 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
@@ -75,6 +75,10 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
override def simpleString: String = s"array<${elementType.simpleString}>"
- private[spark] override def asNullable: ArrayType =
+ override private[spark] def asNullable: ArrayType =
ArrayType(elementType.asNullable, containsNull = true)
+
+ override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = {
+ f(this) || elementType.existsRecursively(f)
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index f4428c2e8b..7bcd623b3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -77,6 +77,11 @@ abstract class DataType extends AbstractDataType {
*/
private[spark] def asNullable: DataType
+ /**
+ * Returns true if any `DataType` of this DataType tree satisfies the given function `f`.
+ */
+ private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = f(this)
+
override private[sql] def defaultConcreteType: DataType = this
override private[sql] def acceptsType(other: DataType): Boolean = sameType(other)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
index ac34b64282..00461e529c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
@@ -62,8 +62,12 @@ case class MapType(
override def simpleString: String = s"map<${keyType.simpleString},${valueType.simpleString}>"
- private[spark] override def asNullable: MapType =
+ override private[spark] def asNullable: MapType =
MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true)
+
+ override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = {
+ f(this) || keyType.existsRecursively(f) || valueType.existsRecursively(f)
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 9cbc207538..d8968ef806 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -24,7 +24,7 @@ import org.json4s.JsonDSL._
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, AttributeReference, Attribute, InterpretedOrdering$}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
/**
@@ -292,7 +292,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
private[sql] def merge(that: StructType): StructType =
StructType.merge(this, that).asInstanceOf[StructType]
- private[spark] override def asNullable: StructType = {
+ override private[spark] def asNullable: StructType = {
val newFields = fields.map {
case StructField(name, dataType, nullable, metadata) =>
StructField(name, dataType.asNullable, nullable = true, metadata)
@@ -301,6 +301,10 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
StructType(newFields)
}
+ override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = {
+ f(this) || fields.exists(field => field.dataType.existsRecursively(f))
+ }
+
private[sql] val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType))
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 88b221cd81..706ecd29d1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -170,6 +170,30 @@ class DataTypeSuite extends SparkFunSuite {
}
}
+ test("existsRecursively") {
+ val struct = StructType(
+ StructField("a", LongType) ::
+ StructField("b", FloatType) :: Nil)
+ assert(struct.existsRecursively(_.isInstanceOf[LongType]))
+ assert(struct.existsRecursively(_.isInstanceOf[StructType]))
+ assert(!struct.existsRecursively(_.isInstanceOf[IntegerType]))
+
+ val mapType = MapType(struct, StringType)
+ assert(mapType.existsRecursively(_.isInstanceOf[LongType]))
+ assert(mapType.existsRecursively(_.isInstanceOf[StructType]))
+ assert(mapType.existsRecursively(_.isInstanceOf[StringType]))
+ assert(mapType.existsRecursively(_.isInstanceOf[MapType]))
+ assert(!mapType.existsRecursively(_.isInstanceOf[IntegerType]))
+
+ val arrayType = ArrayType(mapType)
+ assert(arrayType.existsRecursively(_.isInstanceOf[LongType]))
+ assert(arrayType.existsRecursively(_.isInstanceOf[StructType]))
+ assert(arrayType.existsRecursively(_.isInstanceOf[StringType]))
+ assert(arrayType.existsRecursively(_.isInstanceOf[MapType]))
+ assert(arrayType.existsRecursively(_.isInstanceOf[ArrayType]))
+ assert(!arrayType.existsRecursively(_.isInstanceOf[IntegerType]))
+ }
+
def checkDataTypeJsonRepr(dataType: DataType): Unit = {
test(s"JSON - $dataType") {
assert(DataType.fromJson(dataType.json) === dataType)