diff options
author | Andrew Or <andrew@databricks.com> | 2016-04-26 21:29:25 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-26 21:29:25 -0700 |
commit | d8a83a564ff3fd0281007adbf8aa3757da8a2c2b (patch) | |
tree | 9cf2a91e9924485a6425ede02ffb0fa3f63714b1 /sql/catalyst/src | |
parent | d93976d8660b68eeef646d1fe687cfce01f50f9d (diff) | |
download | spark-d8a83a564ff3fd0281007adbf8aa3757da8a2c2b.tar.gz spark-d8a83a564ff3fd0281007adbf8aa3757da8a2c2b.tar.bz2 spark-d8a83a564ff3fd0281007adbf8aa3757da8a2c2b.zip |
[SPARK-13477][SQL] Expose new user-facing Catalog interface
## What changes were proposed in this pull request?
#12625 exposed a new user-facing conf interface in `SparkSession`. This patch adds a catalog interface.
## How was this patch tested?
See `CatalogSuite`.
Author: Andrew Or <andrew@databricks.com>
Closes #12713 from andrewor14/user-facing-catalog.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 26 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 36 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 8 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala | 5 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala) | 11 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala | 2 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala | 2 |
7 files changed, 69 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index bd723135b5..be67605c45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -22,7 +22,15 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.Utils + + +/** + * A helper trait to create [[org.apache.spark.sql.catalyst.encoders.ExpressionEncoder]]s + * for classes whose fields are entirely defined by constructor params but should not be + * case classes. + */ +private[sql] trait DefinedByConstructorParams + /** * A default version of ScalaReflection that uses the runtime universe. @@ -333,7 +341,7 @@ object ScalaReflection extends ScalaReflection { "toScalaMap", keyData :: valueData :: Nil) - case t if t <:< localTypeOf[Product] => + case t if definedByConstructorParams(t) => val params = getConstructorParameters(t) val cls = getClassFromType(tpe) @@ -401,7 +409,7 @@ object ScalaReflection extends ScalaReflection { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "${clsName}"""" :: Nil serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if tpe <:< localTypeOf[Product] => s + case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) } } @@ -491,7 +499,7 @@ object ScalaReflection extends ScalaReflection { serializerFor(unwrapped, optType, newPath)) } - case t if t <:< localTypeOf[Product] => + case t if definedByConstructorParams(t) => val params = getConstructorParameters(t) val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) => val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType)) @@ -680,7 +688,7 @@ object ScalaReflection extends ScalaReflection { val Schema(valueDataType, valueNullable) = schemaFor(valueType) Schema(MapType(schemaFor(keyType).dataType, valueDataType, valueContainsNull = valueNullable), nullable = true) - case t if t <:< localTypeOf[Product] => + case t if definedByConstructorParams(t) => val params = getConstructorParameters(t) Schema(StructType( params.map { case (fieldName, fieldType) => @@ -712,6 +720,14 @@ object ScalaReflection extends ScalaReflection { throw new UnsupportedOperationException(s"Schema for type $other is not supported") } } + + /** + * Whether the fields of the given type is defined entirely by its constructor parameters. + */ + private[sql] def definedByConstructorParams(tpe: Type): Boolean = { + tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams] + } + } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 402aacfc1f..91d35de790 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -613,6 +613,25 @@ class SessionCatalog( } /** + * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists. + */ + private[spark] def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = { + // TODO: just make function registry take in FunctionIdentifier instead of duplicating this + val qualifiedName = name.copy(database = name.database.orElse(Some(currentDb))) + functionRegistry.lookupFunction(name.funcName) + .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString)) + .getOrElse { + val db = qualifiedName.database.get + if (externalCatalog.functionExists(db, name.funcName)) { + val metadata = externalCatalog.getFunction(db, name.funcName) + new ExpressionInfo(metadata.className, qualifiedName.unquotedString) + } else { + failFunctionLookup(name.funcName) + } + } + } + + /** * Return an [[Expression]] that represents the specified function, assuming it exists. * * For a temporary function or a permanent function that has been loaded, @@ -646,6 +665,7 @@ class SessionCatalog( // The function has not been loaded to the function registry, which means // that the function is a permanent function (if it actually has been registered // in the metastore). We need to first put the function in the FunctionRegistry. + // TODO: why not just check whether the function exists first? val catalogFunction = try { externalCatalog.getFunction(currentDb, name.funcName) } catch { @@ -662,7 +682,7 @@ class SessionCatalog( val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className) createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false) // Now, we need to create the Expression. - return functionRegistry.lookupFunction(qualifiedName.unquotedString, children) + functionRegistry.lookupFunction(qualifiedName.unquotedString, children) } /** @@ -687,8 +707,8 @@ class SessionCatalog( // ----------------- /** - * Drop all existing databases (except "default") along with all associated tables, - * partitions and functions, and set the current database to "default". + * Drop all existing databases (except "default"), tables, partitions and functions, + * and set the current database to "default". * * This is mainly used for tests. */ @@ -697,6 +717,16 @@ class SessionCatalog( listDatabases().filter(_ != default).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } + listTables(default).foreach { table => + dropTable(table, ignoreIfNotExists = false) + } + listFunctions(default).foreach { func => + if (func.database.isDefined) { + dropFunction(func, ignoreIfNotExists = false) + } else { + dropTempFunction(func.funcName, ignoreIfNotExists = false) + } + } tempTables.clear() functionRegistry.clear() // restore built-in functions diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 9e90987731..d1e2b3f664 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -299,10 +299,10 @@ case class CatalogTable( case class CatalogTableType private(name: String) object CatalogTableType { - val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE") - val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE") - val INDEX_TABLE = new CatalogTableType("INDEX_TABLE") - val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW") + val EXTERNAL = new CatalogTableType("EXTERNAL") + val MANAGED = new CatalogTableType("MANAGED") + val INDEX = new CatalogTableType("INDEX") + val VIEW = new CatalogTableType("VIEW") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 56d29cfbe1..5d294485af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -47,8 +47,9 @@ object ExpressionEncoder { def apply[T : TypeTag](): ExpressionEncoder[T] = { // We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = typeTag[T].mirror - val cls = mirror.runtimeClass(typeTag[T].tpe) - val flat = !classOf[Product].isAssignableFrom(cls) + val tpe = typeTag[T].tpe + val cls = mirror.runtimeClass(tpe) + val flat = !ScalaReflection.definedByConstructorParams(tpe) val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = false) val serializer = ScalaReflection.serializerFor[T](inputObject) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index f961fe3292..d739b17743 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.util.Utils * * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this. */ -abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { +abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEach { protected val utils: CatalogTestUtils import utils._ @@ -152,10 +152,10 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("the table type of an external table should be EXTERNAL_TABLE") { val catalog = newBasicCatalog() val table = - newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL_TABLE) + newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL) catalog.createTable("db2", table, ignoreIfExists = false) val actual = catalog.getTable("db2", "external_table1") - assert(actual.tableType === CatalogTableType.EXTERNAL_TABLE) + assert(actual.tableType === CatalogTableType.EXTERNAL) } test("drop table") { @@ -551,14 +551,15 @@ abstract class CatalogTestUtils { def newTable(name: String, database: Option[String] = None): CatalogTable = { CatalogTable( identifier = TableIdentifier(name, database), - tableType = CatalogTableType.EXTERNAL_TABLE, + tableType = CatalogTableType.EXTERNAL, storage = storageFormat, schema = Seq( CatalogColumn("col1", "int"), CatalogColumn("col2", "string"), CatalogColumn("a", "int"), CatalogColumn("b", "string")), - partitionColumnNames = Seq("a", "b")) + partitionColumnNames = Seq("a", "b"), + bucketColumnNames = Seq("col1")) } def newFunc(name: String, database: Option[String] = None): CatalogFunction = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala index 63a7b2c661..0605daa3f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog /** Test suite for the [[InMemoryCatalog]]. */ -class InMemoryCatalogSuite extends CatalogTestCases { +class InMemoryCatalogSuite extends ExternalCatalogSuite { protected override val utils: CatalogTestUtils = new CatalogTestUtils { override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat" diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 1933be50b2..ba5d8ce0f4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias} /** * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented. * - * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]]. + * Note: many of the methods here are very similar to the ones in [[ExternalCatalogSuite]]. * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method * signatures but do not extend a common parent. This is largely by design but * unfortunately leads to very similar test code in two places. |