aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSandeep Singh <sandeep@techaddict.me>2016-05-10 14:21:47 -0700
committerAndrew Or <andrew@databricks.com>2016-05-10 14:22:03 -0700
commitda02d006bbb5c4fe62abd5542b9fff7d1c58603c (patch)
tree5c29ef4941f739b8ba78ba8b74422933b255bb22 /sql
parent9533f5390a3ad7ab96a7bea01cdb6aed89503a51 (diff)
downloadspark-da02d006bbb5c4fe62abd5542b9fff7d1c58603c.tar.gz
spark-da02d006bbb5c4fe62abd5542b9fff7d1c58603c.tar.bz2
spark-da02d006bbb5c4fe62abd5542b9fff7d1c58603c.zip
[SPARK-15249][SQL] Use FunctionResource instead of (String, String) in CreateFunction and CatalogFunction for resource
Use FunctionResource instead of (String, String) in CreateFunction and CatalogFunction for resource see: TODO's here https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L36 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala#L42 Existing tests Author: Sandeep Singh <sandeep@techaddict.me> Closes #13024 from techaddict/SPARK-15249.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala4
10 files changed, 27 insertions, 27 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 7505e2c236..f53311c5c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -687,12 +687,8 @@ class SessionCatalog(
* Loads resources such as JARs and Files for a function. Every resource is represented
* by a tuple (resource type, resource uri).
*/
- def loadFunctionResources(resources: Seq[(String, String)]): Unit = {
- resources.foreach { case (resourceType, uri) =>
- val functionResource =
- FunctionResource(FunctionResourceType.fromString(resourceType.toLowerCase), uri)
- functionResourceLoader.loadResource(functionResource)
- }
+ def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
+ resources.foreach(functionResourceLoader.loadResource)
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
index 5adcc892cf..7da1fe93c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
@@ -20,16 +20,16 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.AnalysisException
/** An trait that represents the type of a resourced needed by a function. */
-sealed trait FunctionResourceType
+abstract class FunctionResourceType(val resourceType: String)
-object JarResource extends FunctionResourceType
+object JarResource extends FunctionResourceType("jar")
-object FileResource extends FunctionResourceType
+object FileResource extends FunctionResourceType("file")
// We do not allow users to specify a archive because it is YARN specific.
// When loading resources, we will throw an exception and ask users to
// use --archive with spark submit.
-object ArchiveResource extends FunctionResourceType
+object ArchiveResource extends FunctionResourceType("archive")
object FunctionResourceType {
def fromString(resourceType: String): FunctionResourceType = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2c6e9f53b2..fc2068cac5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -33,11 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
* @param resources resource types and Uris used by the function
*/
-// TODO: Use FunctionResource instead of (String, String) as the element type of resources.
case class CatalogFunction(
identifier: FunctionIdentifier,
className: String,
- resources: Seq[(String, String)])
+ resources: Seq[FunctionResource])
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 651be26485..ae190c0da6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -466,7 +466,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
assert(catalog.getFunction("db2", "func1") ==
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
- Seq.empty[(String, String)]))
+ Seq.empty[FunctionResource]))
intercept[AnalysisException] {
catalog.getFunction("db2", "does_not_exist")
}
@@ -679,7 +679,7 @@ abstract class CatalogTestUtils {
}
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
- CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)])
+ CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[FunctionResource])
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index f2d2e99a3c..80422c20a6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -798,7 +798,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
val expected =
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
- Seq.empty[(String, String)])
+ Seq.empty[FunctionResource])
assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected)
// Get function without explicitly specifying database
catalog.setCurrentDatabase("db2")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 086282d07c..87e6f9094d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -25,7 +25,7 @@ import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
@@ -430,7 +430,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val resourceType = resource.identifier.getText.toLowerCase
resourceType match {
case "jar" | "file" | "archive" =>
- resourceType -> string(resource.STRING)
+ FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING))
case other =>
throw operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index a9aa8d797a..1ea9bc5299 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException}
-import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -39,12 +39,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
* AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
* }}}
*/
-// TODO: Use Seq[FunctionResource] instead of Seq[(String, String)] for resources.
case class CreateFunction(
databaseName: Option[String],
functionName: String,
className: String,
- resources: Seq[(String, String)],
+ resources: Seq[FunctionResource],
isTemp: Boolean)
extends RunnableCommand {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index bd428a06f5..a728ac3c8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -156,13 +157,17 @@ class DDLCommandSuite extends PlanTest {
None,
"helloworld",
"com.matthewrathbone.example.SimpleUDFExample",
- Seq(("jar", "/path/to/jar1"), ("jar", "/path/to/jar2")),
+ Seq(
+ FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
+ FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
isTemp = true)
val expected2 = CreateFunction(
Some("hello"),
"world",
"com.matthewrathbone.example.SimpleUDFExample",
- Seq(("archive", "/path/to/archive"), ("file", "/path/to/file")),
+ Seq(
+ FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
+ FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
isTemp = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index cddc0b6e34..bb32459202 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -677,8 +677,9 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
- val resourceUris = f.resources.map { case (resourceType, resourcePath) =>
- new ResourceUri(ResourceType.valueOf(resourceType.toUpperCase), resourcePath)
+ val resourceUris = f.resources.map { resource =>
+ new ResourceUri(
+ ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri)
}
new HiveFunction(
f.identifier.funcName,
@@ -700,7 +701,7 @@ private[hive] class HiveClientImpl(
case ResourceType.JAR => "jar"
case r => throw new AnalysisException(s"Unknown resource type: $r")
}
- (resourceType, uri.getUri())
+ FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
}
new CatalogFunction(name, hf.getClassName, resources)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index bfe559f0b2..d05a3623ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, JarResource}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
@@ -425,7 +425,7 @@ object PermanentHiveUDFTest2 extends Logging {
val function = CatalogFunction(
FunctionIdentifier("example_max"),
"org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
- ("JAR" -> jar) :: Nil)
+ FunctionResource(JarResource, jar) :: Nil)
hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false)
val source =
hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")