aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-26 21:29:25 -0700
committerReynold Xin <rxin@databricks.com>2016-04-26 21:29:25 -0700
commitd8a83a564ff3fd0281007adbf8aa3757da8a2c2b (patch)
tree9cf2a91e9924485a6425ede02ffb0fa3f63714b1 /sql/catalyst/src
parentd93976d8660b68eeef646d1fe687cfce01f50f9d (diff)
downloadspark-d8a83a564ff3fd0281007adbf8aa3757da8a2c2b.tar.gz
spark-d8a83a564ff3fd0281007adbf8aa3757da8a2c2b.tar.bz2
spark-d8a83a564ff3fd0281007adbf8aa3757da8a2c2b.zip
[SPARK-13477][SQL] Expose new user-facing Catalog interface
## What changes were proposed in this pull request? #12625 exposed a new user-facing conf interface in `SparkSession`. This patch adds a catalog interface. ## How was this patch tested? See `CatalogSuite`. Author: Andrew Or <andrew@databricks.com> Closes #12713 from andrewor14/user-facing-catalog.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala26
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala36
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala)11
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala2
7 files changed, 69 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index bd723135b5..be67605c45 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -22,7 +22,15 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.Utils
+
+
+/**
+ * A helper trait to create [[org.apache.spark.sql.catalyst.encoders.ExpressionEncoder]]s
+ * for classes whose fields are entirely defined by constructor params but should not be
+ * case classes.
+ */
+private[sql] trait DefinedByConstructorParams
+
/**
* A default version of ScalaReflection that uses the runtime universe.
@@ -333,7 +341,7 @@ object ScalaReflection extends ScalaReflection {
"toScalaMap",
keyData :: valueData :: Nil)
- case t if t <:< localTypeOf[Product] =>
+ case t if definedByConstructorParams(t) =>
val params = getConstructorParameters(t)
val cls = getClassFromType(tpe)
@@ -401,7 +409,7 @@ object ScalaReflection extends ScalaReflection {
val clsName = getClassNameFromType(tpe)
val walkedTypePath = s"""- root class: "${clsName}"""" :: Nil
serializerFor(inputObject, tpe, walkedTypePath) match {
- case expressions.If(_, _, s: CreateNamedStruct) if tpe <:< localTypeOf[Product] => s
+ case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s
case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil)
}
}
@@ -491,7 +499,7 @@ object ScalaReflection extends ScalaReflection {
serializerFor(unwrapped, optType, newPath))
}
- case t if t <:< localTypeOf[Product] =>
+ case t if definedByConstructorParams(t) =>
val params = getConstructorParameters(t)
val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) =>
val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType))
@@ -680,7 +688,7 @@ object ScalaReflection extends ScalaReflection {
val Schema(valueDataType, valueNullable) = schemaFor(valueType)
Schema(MapType(schemaFor(keyType).dataType,
valueDataType, valueContainsNull = valueNullable), nullable = true)
- case t if t <:< localTypeOf[Product] =>
+ case t if definedByConstructorParams(t) =>
val params = getConstructorParameters(t)
Schema(StructType(
params.map { case (fieldName, fieldType) =>
@@ -712,6 +720,14 @@ object ScalaReflection extends ScalaReflection {
throw new UnsupportedOperationException(s"Schema for type $other is not supported")
}
}
+
+ /**
+ * Whether the fields of the given type is defined entirely by its constructor parameters.
+ */
+ private[sql] def definedByConstructorParams(tpe: Type): Boolean = {
+ tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams]
+ }
+
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 402aacfc1f..91d35de790 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -613,6 +613,25 @@ class SessionCatalog(
}
/**
+ * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists.
+ */
+ private[spark] def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
+ // TODO: just make function registry take in FunctionIdentifier instead of duplicating this
+ val qualifiedName = name.copy(database = name.database.orElse(Some(currentDb)))
+ functionRegistry.lookupFunction(name.funcName)
+ .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString))
+ .getOrElse {
+ val db = qualifiedName.database.get
+ if (externalCatalog.functionExists(db, name.funcName)) {
+ val metadata = externalCatalog.getFunction(db, name.funcName)
+ new ExpressionInfo(metadata.className, qualifiedName.unquotedString)
+ } else {
+ failFunctionLookup(name.funcName)
+ }
+ }
+ }
+
+ /**
* Return an [[Expression]] that represents the specified function, assuming it exists.
*
* For a temporary function or a permanent function that has been loaded,
@@ -646,6 +665,7 @@ class SessionCatalog(
// The function has not been loaded to the function registry, which means
// that the function is a permanent function (if it actually has been registered
// in the metastore). We need to first put the function in the FunctionRegistry.
+ // TODO: why not just check whether the function exists first?
val catalogFunction = try {
externalCatalog.getFunction(currentDb, name.funcName)
} catch {
@@ -662,7 +682,7 @@ class SessionCatalog(
val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
// Now, we need to create the Expression.
- return functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
+ functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}
/**
@@ -687,8 +707,8 @@ class SessionCatalog(
// -----------------
/**
- * Drop all existing databases (except "default") along with all associated tables,
- * partitions and functions, and set the current database to "default".
+ * Drop all existing databases (except "default"), tables, partitions and functions,
+ * and set the current database to "default".
*
* This is mainly used for tests.
*/
@@ -697,6 +717,16 @@ class SessionCatalog(
listDatabases().filter(_ != default).foreach { db =>
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
+ listTables(default).foreach { table =>
+ dropTable(table, ignoreIfNotExists = false)
+ }
+ listFunctions(default).foreach { func =>
+ if (func.database.isDefined) {
+ dropFunction(func, ignoreIfNotExists = false)
+ } else {
+ dropTempFunction(func.funcName, ignoreIfNotExists = false)
+ }
+ }
tempTables.clear()
functionRegistry.clear()
// restore built-in functions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 9e90987731..d1e2b3f664 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -299,10 +299,10 @@ case class CatalogTable(
case class CatalogTableType private(name: String)
object CatalogTableType {
- val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE")
- val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE")
- val INDEX_TABLE = new CatalogTableType("INDEX_TABLE")
- val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW")
+ val EXTERNAL = new CatalogTableType("EXTERNAL")
+ val MANAGED = new CatalogTableType("MANAGED")
+ val INDEX = new CatalogTableType("INDEX")
+ val VIEW = new CatalogTableType("VIEW")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 56d29cfbe1..5d294485af 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -47,8 +47,9 @@ object ExpressionEncoder {
def apply[T : TypeTag](): ExpressionEncoder[T] = {
// We convert the not-serializable TypeTag into StructType and ClassTag.
val mirror = typeTag[T].mirror
- val cls = mirror.runtimeClass(typeTag[T].tpe)
- val flat = !classOf[Product].isAssignableFrom(cls)
+ val tpe = typeTag[T].tpe
+ val cls = mirror.runtimeClass(tpe)
+ val flat = !ScalaReflection.definedByConstructorParams(tpe)
val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = false)
val serializer = ScalaReflection.serializerFor[T](inputObject)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index f961fe3292..d739b17743 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.Utils
*
* Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
*/
-abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
+abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEach {
protected val utils: CatalogTestUtils
import utils._
@@ -152,10 +152,10 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
test("the table type of an external table should be EXTERNAL_TABLE") {
val catalog = newBasicCatalog()
val table =
- newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL_TABLE)
+ newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
catalog.createTable("db2", table, ignoreIfExists = false)
val actual = catalog.getTable("db2", "external_table1")
- assert(actual.tableType === CatalogTableType.EXTERNAL_TABLE)
+ assert(actual.tableType === CatalogTableType.EXTERNAL)
}
test("drop table") {
@@ -551,14 +551,15 @@ abstract class CatalogTestUtils {
def newTable(name: String, database: Option[String] = None): CatalogTable = {
CatalogTable(
identifier = TableIdentifier(name, database),
- tableType = CatalogTableType.EXTERNAL_TABLE,
+ tableType = CatalogTableType.EXTERNAL,
storage = storageFormat,
schema = Seq(
CatalogColumn("col1", "int"),
CatalogColumn("col2", "string"),
CatalogColumn("a", "int"),
CatalogColumn("b", "string")),
- partitionColumnNames = Seq("a", "b"))
+ partitionColumnNames = Seq("a", "b"),
+ bucketColumnNames = Seq("col1"))
}
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
index 63a7b2c661..0605daa3f9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
/** Test suite for the [[InMemoryCatalog]]. */
-class InMemoryCatalogSuite extends CatalogTestCases {
+class InMemoryCatalogSuite extends ExternalCatalogSuite {
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat"
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 1933be50b2..ba5d8ce0f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
/**
* Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented.
*
- * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]].
+ * Note: many of the methods here are very similar to the ones in [[ExternalCatalogSuite]].
* This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method
* signatures but do not extend a common parent. This is largely by design but
* unfortunately leads to very similar test code in two places.