diff options
author | Xiangrui Meng <meng@databricks.com> | 2014-11-01 14:37:00 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-01 14:37:00 -0700 |
commit | 1d4f3552037cb667971bea2e5078d8b3ce6c2eae (patch) | |
tree | b4318e8bddec8a5fceaf41ce5a5fd1c3fdab2f41 /sql/core | |
parent | 59e626c701227634336110e1bc23afd94c535ede (diff) | |
download | spark-1d4f3552037cb667971bea2e5078d8b3ce6c2eae.tar.gz spark-1d4f3552037cb667971bea2e5078d8b3ce6c2eae.tar.bz2 spark-1d4f3552037cb667971bea2e5078d8b3ce6c2eae.zip |
[SPARK-3569][SQL] Add metadata field to StructField
Add `metadata: Metadata` to `StructField` to store extra information of columns. `Metadata` is a simple wrapper over `Map[String, Any]` with value types restricted to Boolean, Long, Double, String, Metadata, and arrays of those types. SerDe is via JSON.
Metadata is preserved through simple operations like `SELECT`.
marmbrus liancheng
Author: Xiangrui Meng <meng@databricks.com>
Author: Michael Armbrust <michael@databricks.com>
Closes #2701 from mengxr/structfield-metadata and squashes the following commits:
dedda56 [Xiangrui Meng] merge remote
5ef930a [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
c35203f [Xiangrui Meng] Merge pull request #1 from marmbrus/pr/2701
886b85c [Michael Armbrust] Expose Metadata and MetadataBuilder through the public scala and java packages.
589f314 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
1e2abcf [Xiangrui Meng] change default value of metadata to None in python
611d3c2 [Xiangrui Meng] move metadata from Expr to NamedExpr
ddfcfad [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
a438440 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
4266f4d [Xiangrui Meng] add StructField.toString back for backward compatibility
3f49aab [Xiangrui Meng] remove StructField.toString
24a9f80 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
473a7c5 [Xiangrui Meng] merge master
c9d7301 [Xiangrui Meng] organize imports
1fcbf13 [Xiangrui Meng] change metadata type in StructField for Scala/Java
60cc131 [Xiangrui Meng] add doc and header
60614c7 [Xiangrui Meng] add metadata
e42c452 [Xiangrui Meng] merge master
93518fb [Xiangrui Meng] support metadata in python
905bb89 [Xiangrui Meng] java conversions
618e349 [Xiangrui Meng] make tests work in scala
61b8e0f [Xiangrui Meng] merge master
7e5a322 [Xiangrui Meng] do not output metadata in StructField.toString
c41a664 [Xiangrui Meng] merge master
d8af0ed [Xiangrui Meng] move tests to SQLQuerySuite
67fdebb [Xiangrui Meng] add test on join
d65072e [Xiangrui Meng] remove Map.empty
367d237 [Xiangrui Meng] add test
c194d5e [Xiangrui Meng] add metadata field to StructField and Attribute
Diffstat (limited to 'sql/core')
12 files changed, 180 insertions, 36 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java index 37e88d72b9..0c85cdc0aa 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java @@ -17,9 +17,7 @@ package org.apache.spark.sql.api.java; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; /** * The base type of all Spark SQL data types. @@ -151,15 +149,31 @@ public abstract class DataType { * Creates a StructField by specifying the name ({@code name}), data type ({@code dataType}) and * whether values of this field can be null values ({@code nullable}). */ - public static StructField createStructField(String name, DataType dataType, boolean nullable) { + public static StructField createStructField( + String name, + DataType dataType, + boolean nullable, + Metadata metadata) { if (name == null) { throw new IllegalArgumentException("name should not be null."); } if (dataType == null) { throw new IllegalArgumentException("dataType should not be null."); } + if (metadata == null) { + throw new IllegalArgumentException("metadata should not be null."); + } + + return new StructField(name, dataType, nullable, metadata); + } - return new StructField(name, dataType, nullable); + /** + * Creates a StructField with empty metadata. + * + * @see #createStructField(String, DataType, boolean, Metadata) + */ + public static StructField createStructField(String name, DataType dataType, boolean nullable) { + return createStructField(name, dataType, nullable, (new MetadataBuilder()).build()); } /** @@ -191,5 +205,4 @@ public abstract class DataType { return new StructType(fields); } - } diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/Metadata.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/Metadata.java new file mode 100644 index 0000000000..0f819fb01a --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/Metadata.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.api.java; + +/** + * Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean, + * Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and + * Array[Metadata]. JSON is used for serialization. + * + * The default constructor is private. User should use [[MetadataBuilder]]. + */ +class Metadata extends org.apache.spark.sql.catalyst.util.Metadata { + Metadata(scala.collection.immutable.Map<String, Object> map) { + super(map); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/MetadataBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/MetadataBuilder.java new file mode 100644 index 0000000000..6e6b12f072 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/MetadataBuilder.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.api.java; + +/** + * Builder for [[Metadata]]. If there is a key collision, the latter will overwrite the former. + */ +public class MetadataBuilder extends org.apache.spark.sql.catalyst.util.MetadataBuilder { + @Override + public Metadata build() { + return new Metadata(getMap()); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java index b48e2a2c5f..7c60d492bc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.api.java; +import java.util.Map; + /** * A StructField object represents a field in a StructType object. * A StructField object comprises three fields, {@code String name}, {@code DataType dataType}, @@ -24,20 +26,27 @@ package org.apache.spark.sql.api.java; * The field of {@code dataType} specifies the data type of a StructField. * The field of {@code nullable} specifies if values of a StructField can contain {@code null} * values. + * The field of {@code metadata} provides extra information of the StructField. * * To create a {@link StructField}, - * {@link DataType#createStructField(String, DataType, boolean)} + * {@link DataType#createStructField(String, DataType, boolean, Metadata)} * should be used. */ public class StructField { private String name; private DataType dataType; private boolean nullable; + private Metadata metadata; - protected StructField(String name, DataType dataType, boolean nullable) { + protected StructField( + String name, + DataType dataType, + boolean nullable, + Metadata metadata) { this.name = name; this.dataType = dataType; this.nullable = nullable; + this.metadata = metadata; } public String getName() { @@ -52,6 +61,10 @@ public class StructField { return nullable; } + public Metadata getMetadata() { + return metadata; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -62,6 +75,7 @@ public class StructField { if (nullable != that.nullable) return false; if (!dataType.equals(that.dataType)) return false; if (!name.equals(that.name)) return false; + if (!metadata.equals(that.metadata)) return false; return true; } @@ -71,6 +85,7 @@ public class StructField { int result = name.hashCode(); result = 31 * result + dataType.hashCode(); result = 31 * result + (nullable ? 1 : 0); + result = 31 * result + metadata.hashCode(); return result; } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a41a500c9a..4953f8399a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{Optimizer, DefaultOptimizer} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.catalyst.types.DataType import org.apache.spark.sql.execution.{SparkStrategies, _} import org.apache.spark.sql.json._ import org.apache.spark.sql.parquet.ParquetRelation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 047dc85df6..eabe312f92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -117,10 +117,7 @@ private[sql] object JsonRDD extends Logging { } }.flatMap(field => field).toSeq - StructType( - (topLevelFields ++ structFields).sortBy { - case StructField(name, _, _) => name - }) + StructType((topLevelFields ++ structFields).sortBy(_.name)) } makeStruct(resolved.keySet.toSeq, Nil) @@ -128,7 +125,7 @@ private[sql] object JsonRDD extends Logging { private[sql] def nullTypeToStringType(struct: StructType): StructType = { val fields = struct.fields.map { - case StructField(fieldName, dataType, nullable) => { + case StructField(fieldName, dataType, nullable, _) => { val newType = dataType match { case NullType => StringType case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull) @@ -163,9 +160,7 @@ private[sql] object JsonRDD extends Logging { StructField(name, dataType, true) } } - StructType(newFields.toSeq.sortBy { - case StructField(name, _, _) => name - }) + StructType(newFields.toSeq.sortBy(_.name)) } case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) => ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2) @@ -413,7 +408,7 @@ private[sql] object JsonRDD extends Logging { // TODO: Reuse the row instead of creating a new one for every record. val row = new GenericMutableRow(schema.fields.length) schema.fields.zipWithIndex.foreach { - case (StructField(name, dataType, _), i) => + case (StructField(name, dataType, _, _), i) => row.update(i, json.get(name).flatMap(v => Option(v)).map( enforceCorrectType(_, dataType)).orNull) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index e98d151286..f0e57e2a74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -125,6 +125,9 @@ package object sql { @DeveloperApi type DataType = catalyst.types.DataType + @DeveloperApi + val DataType = catalyst.types.DataType + /** * :: DeveloperApi :: * @@ -414,4 +417,24 @@ package object sql { */ @DeveloperApi val StructField = catalyst.types.StructField + + /** + * :: DeveloperApi :: + * + * Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean, + * Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and + * Array[Metadata]. JSON is used for serialization. + * + * The default constructor is private. User should use either [[MetadataBuilder]] or + * [[Metadata$#fromJson]] to create Metadata instances. + * + * @param map an immutable map that stores the data + */ + @DeveloperApi + type Metadata = catalyst.util.Metadata + + /** + * Builder for [[Metadata]]. If there is a key collision, the latter will overwrite the former. + */ + type MetadataBuilder = catalyst.util.MetadataBuilder } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala index 609f7db562..142598c904 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.types.util import org.apache.spark.sql._ -import org.apache.spark.sql.api.java.{DataType => JDataType, StructField => JStructField} +import org.apache.spark.sql.api.java.{DataType => JDataType, StructField => JStructField, MetadataBuilder => JMetaDataBuilder} import scala.collection.JavaConverters._ @@ -31,7 +31,8 @@ protected[sql] object DataTypeConversions { JDataType.createStructField( scalaStructField.name, asJavaDataType(scalaStructField.dataType), - scalaStructField.nullable) + scalaStructField.nullable, + (new JMetaDataBuilder).withMetadata(scalaStructField.metadata).build()) } /** @@ -68,7 +69,8 @@ protected[sql] object DataTypeConversions { StructField( javaStructField.getName, asScalaDataType(javaStructField.getDataType), - javaStructField.isNullable) + javaStructField.isNullable, + javaStructField.getMetadata) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala index 100ecb45e9..6c9db639c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql import org.scalatest.FunSuite -import org.apache.spark.sql.catalyst.types.DataType - class DataTypeSuite extends FunSuite { test("construct an ArrayType") { @@ -79,8 +77,12 @@ class DataTypeSuite extends FunSuite { checkDataTypeJsonRepr(ArrayType(StringType, false)) checkDataTypeJsonRepr(MapType(IntegerType, StringType, true)) checkDataTypeJsonRepr(MapType(IntegerType, ArrayType(DoubleType), false)) + val metadata = new MetadataBuilder() + .putString("name", "age") + .build() checkDataTypeJsonRepr( StructType(Seq( StructField("a", IntegerType, nullable = true), - StructField("b", ArrayType(DoubleType), nullable = false)))) + StructField("b", ArrayType(DoubleType), nullable = false), + StructField("c", DoubleType, nullable = false, metadata)))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4acd92d33d..6befe1b755 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -17,17 +17,16 @@ package org.apache.spark.sql +import java.util.TimeZone + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.joins.BroadcastHashJoin -import org.apache.spark.sql.test._ -import org.scalatest.BeforeAndAfterAll -import java.util.TimeZone -/* Implicits */ -import TestSQLContext._ -import TestData._ +import org.apache.spark.sql.test.TestSQLContext._ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { // Make sure the tables are loaded. @@ -697,6 +696,30 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { ("true", "false") :: Nil) } + test("metadata is propagated correctly") { + val person = sql("SELECT * FROM person") + val schema = person.schema + val docKey = "doc" + val docValue = "first name" + val metadata = new MetadataBuilder() + .putString(docKey, docValue) + .build() + val schemaWithMeta = new StructType(Seq( + schema("id"), schema("name").copy(metadata = metadata), schema("age"))) + val personWithMeta = applySchema(person, schemaWithMeta) + def validateMetadata(rdd: SchemaRDD): Unit = { + assert(rdd.schema("name").metadata.getString(docKey) == docValue) + } + personWithMeta.registerTempTable("personWithMeta") + validateMetadata(personWithMeta.select('name)) + validateMetadata(personWithMeta.select("name".attr)) + validateMetadata(personWithMeta.select('id, 'name)) + validateMetadata(sql("SELECT * FROM personWithMeta")) + validateMetadata(sql("SELECT id, name FROM personWithMeta")) + validateMetadata(sql("SELECT * FROM personWithMeta JOIN salary ON id = personId")) + validateMetadata(sql("SELECT name, salary FROM personWithMeta JOIN salary ON id = personId")) + } + test("SPARK-3371 Renaming a function expression with group by gives error") { registerFunction("len", (s: String) => s.length) checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index c4dd3e860f..836dd17fcc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -166,4 +166,15 @@ object TestData { // An RDD with 4 elements and 8 partitions val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8) withEmptyParts.registerTempTable("withEmptyParts") + + case class Person(id: Int, name: String, age: Int) + case class Salary(personId: Int, salary: Double) + val person = TestSQLContext.sparkContext.parallelize( + Person(0, "mike", 30) :: + Person(1, "jim", 20) :: Nil) + person.registerTempTable("person") + val salary = TestSQLContext.sparkContext.parallelize( + Salary(0, 2000.0) :: + Salary(1, 1000.0) :: Nil) + salary.registerTempTable("salary") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala index 8415af41be..e0e0ff9cb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala @@ -17,12 +17,10 @@ package org.apache.spark.sql.api.java -import org.apache.spark.sql.types.util.DataTypeConversions import org.scalatest.FunSuite -import org.apache.spark.sql.{DataType => SDataType, StructField => SStructField} -import org.apache.spark.sql.{StructType => SStructType} -import DataTypeConversions._ +import org.apache.spark.sql.{DataType => SDataType, StructField => SStructField, StructType => SStructType} +import org.apache.spark.sql.types.util.DataTypeConversions._ class ScalaSideDataTypeConversionSuite extends FunSuite { @@ -67,11 +65,15 @@ class ScalaSideDataTypeConversionSuite extends FunSuite { checkDataType(simpleScalaStructType) // Complex StructType. + val metadata = new MetadataBuilder() + .putString("name", "age") + .build() val complexScalaStructType = SStructType( SStructField("simpleArray", simpleScalaArrayType, true) :: SStructField("simpleMap", simpleScalaMapType, true) :: SStructField("simpleStruct", simpleScalaStructType, true) :: - SStructField("boolean", org.apache.spark.sql.BooleanType, false) :: Nil) + SStructField("boolean", org.apache.spark.sql.BooleanType, false) :: + SStructField("withMeta", org.apache.spark.sql.DoubleType, false, metadata) :: Nil) checkDataType(complexScalaStructType) // Complex ArrayType. |