aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java2
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/package.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala22
6 files changed, 62 insertions, 9 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
index e8d999dd00..462ca3f6f6 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
/**
* Base interface for a function used in Dataset's filter function.
*
- * If the function returns true, the element is discarded in the returned Dataset.
+ * If the function returns true, the element is included in the returned Dataset.
*/
public interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception;
diff --git a/core/src/main/java/org/apache/spark/api/java/function/package.scala b/core/src/main/java/org/apache/spark/api/java/function/package.scala
index 0f9bac7164..e19f12fdac 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/package.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/package.scala
@@ -22,4 +22,4 @@ package org.apache.spark.api.java
* these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's
* Java programming guide for more details.
*/
-package object function
+package object function
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index a99bc3bff6..6ddb1a7a1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -24,6 +24,8 @@ import org.apache.spark.sql.types.StructType
/**
* Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 2.0.0
*/
abstract class Catalog {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
index 0f7feb8eee..33032f07f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
@@ -25,6 +25,14 @@ import org.apache.spark.sql.catalyst.DefinedByConstructorParams
// Note: all classes here are expected to be wrapped in Datasets and so must extend
// DefinedByConstructorParams for the catalog to be able to create encoders for them.
+/**
+ * A database in Spark, as returned by the `listDatabases` method defined in [[Catalog]].
+ *
+ * @param name name of the database.
+ * @param description description of the database.
+ * @param locationUri path (in the form of a uri) to data files.
+ * @since 2.0.0
+ */
class Database(
val name: String,
@Nullable val description: String,
@@ -41,6 +49,16 @@ class Database(
}
+/**
+ * A table in Spark, as returned by the `listTables` method in [[Catalog]].
+ *
+ * @param name name of the table.
+ * @param database name of the database the table belongs to.
+ * @param description description of the table.
+ * @param tableType type of the table (e.g. view, table).
+ * @param isTemporary whether the table is a temporary table.
+ * @since 2.0.0
+ */
class Table(
val name: String,
@Nullable val database: String,
@@ -61,6 +79,17 @@ class Table(
}
+/**
+ * A column in Spark, as returned by `listColumns` method in [[Catalog]].
+ *
+ * @param name name of the column.
+ * @param description description of the column.
+ * @param dataType data type of the column.
+ * @param nullable whether the column is nullable.
+ * @param isPartition whether the column is a partition column.
+ * @param isBucket whether the column is a bucket column.
+ * @since 2.0.0
+ */
class Column(
val name: String,
@Nullable val description: String,
@@ -83,9 +112,19 @@ class Column(
}
-// TODO(andrew): should we include the database here?
+/**
+ * A user-defined function in Spark, as returned by `listFunctions` method in [[Catalog]].
+ *
+ * @param name name of the function.
+ * @param database name of the database the function belongs to.
+ * @param description description of the function; description can be null.
+ * @param className the fully qualified class name of the function.
+ * @param isTemporary whether the function is a temporary function or not.
+ * @since 2.0.0
+ */
class Function(
val name: String,
+ @Nullable val database: String,
@Nullable val description: String,
val className: String,
val isTemporary: Boolean)
@@ -94,6 +133,7 @@ class Function(
override def toString: String = {
"Function[" +
s"name='$name', " +
+ Option(database).map { d => s"database='$d', " }.getOrElse("") +
Option(description).map { d => s"description='$d', " }.getOrElse("") +
s"className='$className', " +
s"isTemporary='$isTemporary']"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index ceb6862275..70e17b10ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -125,6 +125,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
new Function(
name = funcIdent.identifier,
+ database = funcIdent.database.orNull,
description = null, // for now, this is always undefined
className = metadata.getClassName,
isTemporary = funcIdent.database.isEmpty)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index cd434f7887..aec0312c40 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.internal
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.{Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, ScalaReflection, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
@@ -207,6 +207,14 @@ class CatalogSuite
assert(!funcNames2.contains("my_func1"))
assert(funcNames2.contains("my_func2"))
assert(funcNames2.contains("my_temp_func"))
+
+ // Make sure database is set properly.
+ assert(
+ spark.catalog.listFunctions("my_db1").collect().map(_.database).toSet == Set("my_db1", null))
+ assert(
+ spark.catalog.listFunctions("my_db2").collect().map(_.database).toSet == Set("my_db2", null))
+
+ // Remove the function and make sure they no longer appear.
dropFunction("my_func1", Some("my_db1"))
dropTempFunction("my_temp_func")
val funcNames1b = spark.catalog.listFunctions("my_db1").collect().map(_.name).toSet
@@ -248,9 +256,11 @@ class CatalogSuite
}
test("Function.toString") {
- assert(new Function("nama", "commenta", "classNameAh", isTemporary = true).toString ==
- "Function[name='nama', description='commenta', className='classNameAh', isTemporary='true']")
- assert(new Function("nama", null, "classNameAh", isTemporary = false).toString ==
+ assert(
+ new Function("nama", "databasa", "commenta", "classNameAh", isTemporary = true).toString ==
+ "Function[name='nama', database='databasa', description='commenta', " +
+ "className='classNameAh', isTemporary='true']")
+ assert(new Function("nama", null, null, "classNameAh", isTemporary = false).toString ==
"Function[name='nama', className='classNameAh', isTemporary='false']")
}
@@ -268,7 +278,7 @@ class CatalogSuite
test("catalog classes format in Dataset.show") {
val db = new Database("nama", "descripta", "locata")
val table = new Table("nama", "databasa", "descripta", "typa", isTemporary = false)
- val function = new Function("nama", "descripta", "classa", isTemporary = false)
+ val function = new Function("nama", "databasa", "descripta", "classa", isTemporary = false)
val column = new Column(
"nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true)
val dbFields = ScalaReflection.getConstructorParameterValues(db)
@@ -277,7 +287,7 @@ class CatalogSuite
val columnFields = ScalaReflection.getConstructorParameterValues(column)
assert(dbFields == Seq("nama", "descripta", "locata"))
assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false))
- assert(functionFields == Seq("nama", "descripta", "classa", false))
+ assert(functionFields == Seq("nama", "databasa", "descripta", "classa", false))
assert(columnFields == Seq("nama", "descripta", "typa", false, true, true))
val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10)
val tableString = CatalogImpl.makeDataset(Seq(table), spark).showString(10)