aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-02 16:33:48 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-02 16:33:48 -0700
commit158ad0bba9382fd494b4789b5628a9cec00cfa19 (patch)
tree761acc1c8694a167043fc8f45bfa49447d6c1f2d
parent4c477117bb1ffef463776c86f925d35036f96b7a (diff)
downloadspark-158ad0bba9382fd494b4789b5628a9cec00cfa19.tar.gz
spark-158ad0bba9382fd494b4789b5628a9cec00cfa19.tar.bz2
spark-158ad0bba9382fd494b4789b5628a9cec00cfa19.zip
[SPARK-2097][SQL] UDF Support
This patch adds the ability to register lambda functions written in Python, Java or Scala as UDFs for use in SQL or HiveQL. Scala: ```scala registerFunction("strLenScala", (_: String).length) sql("SELECT strLenScala('test')") ``` Python: ```python sqlCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType()) sqlCtx.sql("SELECT strLenPython('test')") ``` Java: ```java sqlContext.registerFunction("stringLengthJava", new UDF1<String, Integer>() { Override public Integer call(String str) throws Exception { return str.length(); } }, DataType.IntegerType); sqlContext.sql("SELECT stringLengthJava('test')"); ``` Author: Michael Armbrust <michael@databricks.com> Closes #1063 from marmbrus/udfs and squashes the following commits: 9eda0fe [Michael Armbrust] newline 747c05e [Michael Armbrust] Add some scala UDF tests. d92727d [Michael Armbrust] Merge remote-tracking branch 'apache/master' into udfs 005d684 [Michael Armbrust] Fix naming and formatting. d14dac8 [Michael Armbrust] Fix last line of autogened java files. 8135c48 [Michael Armbrust] Move UDF unit tests to pyspark. 40b0ffd [Michael Armbrust] Merge remote-tracking branch 'apache/master' into udfs 6a36890 [Michael Armbrust] Switch logging so that SQLContext can be serializable. 7a83101 [Michael Armbrust] Drop toString 795fd15 [Michael Armbrust] Try to avoid capturing SQLContext. e54fb45 [Michael Armbrust] Docs and tests. 437cbe3 [Michael Armbrust] Update use of dataTypes, fix some python tests, address review comments. 01517d6 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into udfs 8e6c932 [Michael Armbrust] WIP 3f96a52 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into udfs 6237c8d [Michael Armbrust] WIP 2766f0b [Michael Armbrust] Move udfs support to SQL from hive. Add support for Java UDFs. 0f7d50c [Michael Armbrust] Draft of native Spark SQL UDFs for Scala and Python.
-rw-r--r--python/pyspark/sql.py39
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala32
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala307
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF1.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF10.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF11.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF12.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF13.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF14.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF15.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF16.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF17.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF18.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF19.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF2.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF20.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF21.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF22.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF3.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF4.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF5.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF6.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF7.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF8.java32
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/api/java/UDF9.java32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala196
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala252
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala177
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java90
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala36
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala4
38 files changed, 1861 insertions, 19 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index f840475ffa..e7c35ac1ff 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -28,9 +28,13 @@ from array import array
from operator import itemgetter
from pyspark.rdd import RDD, PipelinedRDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
+
+from itertools import chain, ifilter, imap
from py4j.protocol import Py4JError
+from py4j.java_collections import ListConverter, MapConverter
+
__all__ = [
"StringType", "BinaryType", "BooleanType", "TimestampType", "DecimalType",
@@ -932,6 +936,39 @@ class SQLContext:
self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
return self._scala_SQLContext
+ def registerFunction(self, name, f, returnType=StringType()):
+ """Registers a lambda function as a UDF so it can be used in SQL statements.
+
+ In addition to a name and the function itself, the return type can be optionally specified.
+ When the return type is not given it default to a string and conversion will automatically
+ be done. For any other return type, the produced object must match the specified type.
+
+ >>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
+ >>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
+ [Row(c0=u'4')]
+ >>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
+ >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
+ [Row(c0=4)]
+ >>> sqlCtx.registerFunction("twoArgs", lambda x, y: len(x) + y, IntegerType())
+ >>> sqlCtx.sql("SELECT twoArgs('test', 1)").collect()
+ [Row(c0=5)]
+ """
+ func = lambda _, it: imap(lambda x: f(*x), it)
+ command = (func,
+ BatchedSerializer(PickleSerializer(), 1024),
+ BatchedSerializer(PickleSerializer(), 1024))
+ env = MapConverter().convert(self._sc.environment,
+ self._sc._gateway._gateway_client)
+ includes = ListConverter().convert(self._sc._python_includes,
+ self._sc._gateway._gateway_client)
+ self._ssql_ctx.registerPython(name,
+ bytearray(CloudPickleSerializer().dumps(command)),
+ env,
+ includes,
+ self._sc.pythonExec,
+ self._sc._javaAccumulator,
+ str(returnType))
+
def inferSchema(self, rdd):
"""Infer and apply a schema to an RDD of L{Row}s.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index c0255701b7..760c49fbca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -18,17 +18,49 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.Expression
+import scala.collection.mutable
/** A catalog for looking up user defined functions, used by an [[Analyzer]]. */
trait FunctionRegistry {
+ type FunctionBuilder = Seq[Expression] => Expression
+
+ def registerFunction(name: String, builder: FunctionBuilder): Unit
+
def lookupFunction(name: String, children: Seq[Expression]): Expression
}
+trait OverrideFunctionRegistry extends FunctionRegistry {
+
+ val functionBuilders = new mutable.HashMap[String, FunctionBuilder]()
+
+ def registerFunction(name: String, builder: FunctionBuilder) = {
+ functionBuilders.put(name, builder)
+ }
+
+ abstract override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+ functionBuilders.get(name).map(_(children)).getOrElse(super.lookupFunction(name,children))
+ }
+}
+
+class SimpleFunctionRegistry extends FunctionRegistry {
+ val functionBuilders = new mutable.HashMap[String, FunctionBuilder]()
+
+ def registerFunction(name: String, builder: FunctionBuilder) = {
+ functionBuilders.put(name, builder)
+ }
+
+ override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+ functionBuilders(name)(children)
+ }
+}
+
/**
* A trivial catalog that returns an error when a function is requested. Used for testing when all
* functions are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyFunctionRegistry extends FunctionRegistry {
+ def registerFunction(name: String, builder: FunctionBuilder) = ???
+
def lookupFunction(name: String, children: Seq[Expression]): Expression = {
throw new UnsupportedOperationException
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index acddf5e9c7..95633dd0c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -27,6 +27,22 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
def references = children.flatMap(_.references).toSet
def nullable = true
+ /** This method has been generated by this script
+
+ (1 to 22).map { x =>
+ val anys = (1 to x).map(x => "Any").reduce(_ + ", " + _)
+ val evals = (0 to x - 1).map(x => s"children($x).eval(input)").reduce(_ + ",\n " + _)
+
+ s"""
+ case $x =>
+ function.asInstanceOf[($anys) => Any](
+ $evals)
+ """
+ }
+
+ */
+
+ // scalastyle:off
override def eval(input: Row): Any = {
children.size match {
case 0 => function.asInstanceOf[() => Any]()
@@ -35,6 +51,297 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
function.asInstanceOf[(Any, Any) => Any](
children(0).eval(input),
children(1).eval(input))
+ case 3 =>
+ function.asInstanceOf[(Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input))
+ case 4 =>
+ function.asInstanceOf[(Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input))
+ case 5 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input))
+ case 6 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input))
+ case 7 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input))
+ case 8 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input))
+ case 9 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input))
+ case 10 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input))
+ case 11 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input))
+ case 12 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input))
+ case 13 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input))
+ case 14 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input))
+ case 15 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input),
+ children(14).eval(input))
+ case 16 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input),
+ children(14).eval(input),
+ children(15).eval(input))
+ case 17 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input),
+ children(14).eval(input),
+ children(15).eval(input),
+ children(16).eval(input))
+ case 18 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input),
+ children(14).eval(input),
+ children(15).eval(input),
+ children(16).eval(input),
+ children(17).eval(input))
+ case 19 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input),
+ children(14).eval(input),
+ children(15).eval(input),
+ children(16).eval(input),
+ children(17).eval(input),
+ children(18).eval(input))
+ case 20 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input),
+ children(14).eval(input),
+ children(15).eval(input),
+ children(16).eval(input),
+ children(17).eval(input),
+ children(18).eval(input),
+ children(19).eval(input))
+ case 21 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input),
+ children(14).eval(input),
+ children(15).eval(input),
+ children(16).eval(input),
+ children(17).eval(input),
+ children(18).eval(input),
+ children(19).eval(input),
+ children(20).eval(input))
+ case 22 =>
+ function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+ children(0).eval(input),
+ children(1).eval(input),
+ children(2).eval(input),
+ children(3).eval(input),
+ children(4).eval(input),
+ children(5).eval(input),
+ children(6).eval(input),
+ children(7).eval(input),
+ children(8).eval(input),
+ children(9).eval(input),
+ children(10).eval(input),
+ children(11).eval(input),
+ children(12).eval(input),
+ children(13).eval(input),
+ children(14).eval(input),
+ children(15).eval(input),
+ children(16).eval(input),
+ children(17).eval(input),
+ children(18).eval(input),
+ children(19).eval(input),
+ children(20).eval(input),
+ children(21).eval(input))
}
+ // scalastyle:on
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF1.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF1.java
new file mode 100644
index 0000000000..ef959e35e1
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF1.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 1 arguments.
+ */
+public interface UDF1<T1, R> extends Serializable {
+ public R call(T1 t1) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF10.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF10.java
new file mode 100644
index 0000000000..96ab3a96c3
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF10.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 10 arguments.
+ */
+public interface UDF10<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF11.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF11.java
new file mode 100644
index 0000000000..58ae8edd6d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF11.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 11 arguments.
+ */
+public interface UDF11<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF12.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF12.java
new file mode 100644
index 0000000000..d9da0f6edd
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF12.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 12 arguments.
+ */
+public interface UDF12<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF13.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF13.java
new file mode 100644
index 0000000000..095fc1a807
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF13.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 13 arguments.
+ */
+public interface UDF13<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF14.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF14.java
new file mode 100644
index 0000000000..eb27eaa180
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF14.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 14 arguments.
+ */
+public interface UDF14<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF15.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF15.java
new file mode 100644
index 0000000000..1fbcff5633
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF15.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 15 arguments.
+ */
+public interface UDF15<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF16.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF16.java
new file mode 100644
index 0000000000..1133561787
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF16.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 16 arguments.
+ */
+public interface UDF16<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF17.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF17.java
new file mode 100644
index 0000000000..dfae7922c9
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF17.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 17 arguments.
+ */
+public interface UDF17<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF18.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF18.java
new file mode 100644
index 0000000000..e9d1c6d52d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF18.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 18 arguments.
+ */
+public interface UDF18<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF19.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF19.java
new file mode 100644
index 0000000000..46b9d2d3c9
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF19.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 19 arguments.
+ */
+public interface UDF19<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, T19 t19) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF2.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF2.java
new file mode 100644
index 0000000000..cd3fde8da4
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF2.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 2 arguments.
+ */
+public interface UDF2<T1, T2, R> extends Serializable {
+ public R call(T1 t1, T2 t2) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF20.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF20.java
new file mode 100644
index 0000000000..113d3d26be
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF20.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 20 arguments.
+ */
+public interface UDF20<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, T19 t19, T20 t20) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF21.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF21.java
new file mode 100644
index 0000000000..74118f2cf8
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF21.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 21 arguments.
+ */
+public interface UDF21<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, T19 t19, T20 t20, T21 t21) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF22.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF22.java
new file mode 100644
index 0000000000..0e7cc40be4
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF22.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 22 arguments.
+ */
+public interface UDF22<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, T19 t19, T20 t20, T21 t21, T22 t22) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF3.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF3.java
new file mode 100644
index 0000000000..6a880f16be
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF3.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 3 arguments.
+ */
+public interface UDF3<T1, T2, T3, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF4.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF4.java
new file mode 100644
index 0000000000..fcad2febb1
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF4.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 4 arguments.
+ */
+public interface UDF4<T1, T2, T3, T4, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF5.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF5.java
new file mode 100644
index 0000000000..ce0cef43a2
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF5.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 5 arguments.
+ */
+public interface UDF5<T1, T2, T3, T4, T5, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF6.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF6.java
new file mode 100644
index 0000000000..f56b806684
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF6.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 6 arguments.
+ */
+public interface UDF6<T1, T2, T3, T4, T5, T6, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF7.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF7.java
new file mode 100644
index 0000000000..25bd6d3241
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF7.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 7 arguments.
+ */
+public interface UDF7<T1, T2, T3, T4, T5, T6, T7, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF8.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF8.java
new file mode 100644
index 0000000000..a3b7ac5f94
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF8.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 8 arguments.
+ */
+public interface UDF8<T1, T2, T3, T4, T5, T6, T7, T8, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF9.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF9.java
new file mode 100644
index 0000000000..205e72a152
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF9.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 9 arguments.
+ */
+public interface UDF9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> extends Serializable {
+ public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9) throws Exception;
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 00dd34aabc..33931e5d99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -48,18 +48,23 @@ import org.apache.spark.{Logging, SparkContext}
*/
@AlphaComponent
class SQLContext(@transient val sparkContext: SparkContext)
- extends Logging
+ extends org.apache.spark.Logging
with SQLConf
with ExpressionConversions
+ with UDFRegistration
with Serializable {
self =>
@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
+
+ @transient
+ protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry
+
@transient
protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)
+ new Analyzer(catalog, functionRegistry, caseSensitive = true)
@transient
protected[sql] val optimizer = Optimizer
@transient
@@ -379,7 +384,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected abstract class QueryExecution {
def logical: LogicalPlan
- lazy val analyzed = analyzer(logical)
+ lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala
new file mode 100644
index 0000000000..0b48e9e659
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.{List => JList, Map => JMap}
+
+import org.apache.spark.Accumulator
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf}
+import org.apache.spark.sql.execution.PythonUDF
+
+import scala.reflect.runtime.universe.{TypeTag, typeTag}
+
+/**
+ * Functions for registering scala lambda functions as UDFs in a SQLContext.
+ */
+protected[sql] trait UDFRegistration {
+ self: SQLContext =>
+
+ private[spark] def registerPython(
+ name: String,
+ command: Array[Byte],
+ envVars: JMap[String, String],
+ pythonIncludes: JList[String],
+ pythonExec: String,
+ accumulator: Accumulator[JList[Array[Byte]]],
+ stringDataType: String): Unit = {
+ log.debug(
+ s"""
+ | Registering new PythonUDF:
+ | name: $name
+ | command: ${command.toSeq}
+ | envVars: $envVars
+ | pythonIncludes: $pythonIncludes
+ | pythonExec: $pythonExec
+ | dataType: $stringDataType
+ """.stripMargin)
+
+
+ val dataType = parseDataType(stringDataType)
+
+ def builder(e: Seq[Expression]) =
+ PythonUDF(
+ name,
+ command,
+ envVars,
+ pythonIncludes,
+ pythonExec,
+ accumulator,
+ dataType,
+ e)
+
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ /** registerFunction 1-22 were generated by this script
+
+ (1 to 22).map { x =>
+ val types = (1 to x).map(x => "_").reduce(_ + ", " + _)
+ s"""
+ def registerFunction[T: TypeTag](name: String, func: Function$x[$types, T]): Unit = {
+ def builder(e: Seq[Expression]) =
+ ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+ """
+ }
+ */
+
+ // scalastyle:off
+ def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function2[_, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function3[_, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function4[_, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function5[_, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function6[_, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function7[_, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function8[_, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function9[_, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function10[_, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function11[_, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function12[_, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+
+ def registerFunction[T: TypeTag](name: String, func: Function22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+ def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+ functionRegistry.registerFunction(name, builder)
+ }
+ // scalastyle:on
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 809dd038f9..ae45193ed1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -28,14 +28,13 @@ import org.apache.spark.sql.{SQLContext, StructType => SStructType}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
-import org.apache.spark.sql.types.util.DataTypeConversions
-import DataTypeConversions.asScalaDataType;
+import org.apache.spark.sql.types.util.DataTypeConversions.asScalaDataType
import org.apache.spark.util.Utils
/**
* The entry point for executing Spark SQL queries from a Java program.
*/
-class JavaSQLContext(val sqlContext: SQLContext) {
+class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala
new file mode 100644
index 0000000000..158f26e3d4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala
@@ -0,0 +1,252 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.api.java
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf}
+import org.apache.spark.sql.types.util.DataTypeConversions._
+
+/**
+ * A collection of functions that allow Java users to register UDFs. In order to handle functions
+ * of varying airities with minimal boilerplate for our users, we generate classes and functions
+ * for each airity up to 22. The code for this generation can be found in comments in this trait.
+ */
+private[java] trait UDFRegistration {
+ self: JavaSQLContext =>
+
+ /* The following functions and required interfaces are generated with these code fragments:
+
+ (1 to 22).foreach { i =>
+ val extTypeArgs = (1 to i).map(_ => "_").mkString(", ")
+ val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ")
+ val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]"
+ val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
+ println(s"""
+ |def registerFunction(
+ | name: String, f: UDF$i[$extTypeArgs, _], @transient dataType: DataType) = {
+ | val scalaType = asScalaDataType(dataType)
+ | sqlContext.functionRegistry.registerFunction(
+ | name,
+ | (e: Seq[Expression]) => ScalaUdf(f$anyCast.call($anyParams), scalaType, e))
+ |}
+ """.stripMargin)
+ }
+
+ import java.io.File
+ import org.apache.spark.sql.catalyst.util.stringToFile
+ val directory = new File("sql/core/src/main/java/org/apache/spark/sql/api/java/")
+ (1 to 22).foreach { i =>
+ val typeArgs = (1 to i).map(i => s"T$i").mkString(", ")
+ val args = (1 to i).map(i => s"T$i t$i").mkString(", ")
+
+ val contents =
+ s"""/*
+ | * Licensed to the Apache Software Foundation (ASF) under one or more
+ | * contributor license agreements. See the NOTICE file distributed with
+ | * this work for additional information regarding copyright ownership.
+ | * The ASF licenses this file to You under the Apache License, Version 2.0
+ | * (the "License"); you may not use this file except in compliance with
+ | * the License. You may obtain a copy of the License at
+ | *
+ | * http://www.apache.org/licenses/LICENSE-2.0
+ | *
+ | * Unless required by applicable law or agreed to in writing, software
+ | * distributed under the License is distributed on an "AS IS" BASIS,
+ | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ | * See the License for the specific language governing permissions and
+ | * limitations under the License.
+ | */
+ |
+ |package org.apache.spark.sql.api.java;
+ |
+ |import java.io.Serializable;
+ |
+ |// **************************************************
+ |// THIS FILE IS AUTOGENERATED BY CODE IN
+ |// org.apache.spark.sql.api.java.FunctionRegistration
+ |// **************************************************
+ |
+ |/**
+ | * A Spark SQL UDF that has $i arguments.
+ | */
+ |public interface UDF$i<$typeArgs, R> extends Serializable {
+ | public R call($args) throws Exception;
+ |}
+ |""".stripMargin
+
+ stringToFile(new File(directory, s"UDF$i.java"), contents)
+ }
+
+ */
+
+ // scalastyle:off
+ def registerFunction(name: String, f: UDF1[_, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF2[_, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF3[_, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF4[_, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF5[_, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF6[_, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF7[_, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF8[_, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ def registerFunction(name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+ val scalaType = asScalaDataType(dataType)
+ sqlContext.functionRegistry.registerFunction(
+ name,
+ (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+ }
+
+ // scalastyle:on
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 8bec015c7b..f0c958fdb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -286,6 +286,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.ExistingRdd(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
+ case e @ EvaluatePython(udf, child) =>
+ BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
new file mode 100644
index 0000000000..b92091b560
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -0,0 +1,177 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution
+
+import java.util.{List => JList, Map => JMap}
+
+import net.razorvine.pickle.{Pickler, Unpickler}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.python.PythonRDD
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.{Accumulator, Logging => SparkLogging}
+
+import scala.collection.JavaConversions._
+
+/**
+ * A serialized version of a Python lambda function. Suitable for use in a [[PythonRDD]].
+ */
+private[spark] case class PythonUDF(
+ name: String,
+ command: Array[Byte],
+ envVars: JMap[String, String],
+ pythonIncludes: JList[String],
+ pythonExec: String,
+ accumulator: Accumulator[JList[Array[Byte]]],
+ dataType: DataType,
+ children: Seq[Expression]) extends Expression with SparkLogging {
+
+ override def toString = s"PythonUDF#$name(${children.mkString(",")})"
+
+ def nullable: Boolean = true
+ def references: Set[Attribute] = children.flatMap(_.references).toSet
+
+ override def eval(input: Row) = sys.error("PythonUDFs can not be directly evaluated.")
+}
+
+/**
+ * Extracts PythonUDFs from operators, rewriting the query plan so that the UDF can be evaluated
+ * alone in a batch.
+ *
+ * This has the limitation that the input to the Python UDF is not allowed include attributes from
+ * multiple child operators.
+ */
+private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan) = plan transform {
+ // Skip EvaluatePython nodes.
+ case p: EvaluatePython => p
+
+ case l: LogicalPlan =>
+ // Extract any PythonUDFs from the current operator.
+ val udfs = l.expressions.flatMap(_.collect { case udf: PythonUDF => udf})
+ if (udfs.isEmpty) {
+ // If there aren't any, we are done.
+ l
+ } else {
+ // Pick the UDF we are going to evaluate (TODO: Support evaluating multiple UDFs at a time)
+ // If there is more than one, we will add another evaluation operator in a subsequent pass.
+ val udf = udfs.head
+
+ var evaluation: EvaluatePython = null
+
+ // Rewrite the child that has the input required for the UDF
+ val newChildren = l.children.map { child =>
+ // Check to make sure that the UDF can be evaluated with only the input of this child.
+ // Other cases are disallowed as they are ambiguous or would require a cartisian product.
+ if (udf.references.subsetOf(child.outputSet)) {
+ evaluation = EvaluatePython(udf, child)
+ evaluation
+ } else if (udf.references.intersect(child.outputSet).nonEmpty) {
+ sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.")
+ } else {
+ child
+ }
+ }
+
+ assert(evaluation != null, "Unable to evaluate PythonUDF. Missing input attributes.")
+
+ // Trim away the new UDF value if it was only used for filtering or something.
+ logical.Project(
+ l.output,
+ l.transformExpressions {
+ case p: PythonUDF if p.id == udf.id => evaluation.resultAttribute
+ }.withNewChildren(newChildren))
+ }
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ * Evaluates a [[PythonUDF]], appending the result to the end of the input tuple.
+ */
+@DeveloperApi
+case class EvaluatePython(udf: PythonUDF, child: LogicalPlan) extends logical.UnaryNode {
+ val resultAttribute = AttributeReference("pythonUDF", udf.dataType, nullable=true)()
+
+ def references = Set.empty
+ def output = child.output :+ resultAttribute
+}
+
+/**
+ * :: DeveloperApi ::
+ * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. The input
+ * data is cached and zipped with the result of the udf evaluation.
+ */
+@DeveloperApi
+case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: SparkPlan)
+ extends SparkPlan {
+ def children = child :: Nil
+
+ def execute() = {
+ // TODO: Clean up after ourselves?
+ val childResults = child.execute().map(_.copy()).cache()
+
+ val parent = childResults.mapPartitions { iter =>
+ val pickle = new Pickler
+ val currentRow = newMutableProjection(udf.children, child.output)()
+ iter.grouped(1000).map { inputRows =>
+ val toBePickled = inputRows.map(currentRow(_).toArray).toArray
+ pickle.dumps(toBePickled)
+ }
+ }
+
+ val pyRDD = new PythonRDD(
+ parent,
+ udf.command,
+ udf.envVars,
+ udf.pythonIncludes,
+ false,
+ udf.pythonExec,
+ Seq[Broadcast[Array[Byte]]](),
+ udf.accumulator
+ ).mapPartitions { iter =>
+ val pickle = new Unpickler
+ iter.flatMap { pickedResult =>
+ val unpickledBatch = pickle.loads(pickedResult)
+ unpickledBatch.asInstanceOf[java.util.ArrayList[Any]]
+ }
+ }.mapPartitions { iter =>
+ val row = new GenericMutableRow(1)
+ iter.map { result =>
+ row(0) = udf.dataType match {
+ case StringType => result.toString
+ case other => result
+ }
+ row: Row
+ }
+ }
+
+ childResults.zip(pyRDD).mapPartitions { iter =>
+ val joinedRow = new JoinedRow()
+ iter.map {
+ case (row, udfResult) =>
+ joinedRow(row, udfResult)
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
new file mode 100644
index 0000000000..a9a11285de
--- /dev/null
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+import org.apache.spark.sql.api.java.UDF1;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Suite;
+import org.junit.runner.RunWith;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+ private transient JavaSparkContext sc;
+ private transient JavaSQLContext sqlContext;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaAPISuite");
+ sqlContext = new JavaSQLContext(sc);
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void udf1Test() {
+ // With Java 8 lambdas:
+ // sqlContext.registerFunction(
+ // "stringLengthTest", (String str) -> str.length(), DataType.IntegerType);
+
+ sqlContext.registerFunction("stringLengthTest", new UDF1<String, Integer>() {
+ @Override
+ public Integer call(String str) throws Exception {
+ return str.length();
+ }
+ }, DataType.IntegerType);
+
+ // TODO: Why do we need this cast?
+ Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test')").first();
+ assert(result.getInt(0) == 4);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void udf2Test() {
+ // With Java 8 lambdas:
+ // sqlContext.registerFunction(
+ // "stringLengthTest",
+ // (String str1, String str2) -> str1.length() + str2.length,
+ // DataType.IntegerType);
+
+ sqlContext.registerFunction("stringLengthTest", new UDF2<String, String, Integer>() {
+ @Override
+ public Integer call(String str1, String str2) throws Exception {
+ return str1.length() + str2.length();
+ }
+ }, DataType.IntegerType);
+
+ // TODO: Why do we need this cast?
+ Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test', 'test2')").first();
+ assert(result.getInt(0) == 9);
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
index 4f0b85f262..23a711d08c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import java.io.File
+import _root_.java.io.File
/* Implicits */
import org.apache.spark.sql.test.TestSQLContext._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
new file mode 100644
index 0000000000..76aa9b0081
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test._
+
+/* Implicits */
+import TestSQLContext._
+
+class UDFSuite extends QueryTest {
+
+ test("Simple UDF") {
+ registerFunction("strLenScala", (_: String).length)
+ assert(sql("SELECT strLenScala('test')").first().getInt(0) === 4)
+ }
+
+ test("TwoArgument UDF") {
+ registerFunction("strLenScala", (_: String).length + (_:Int))
+ assert(sql("SELECT strLenScala('test', 1)").first().getInt(0) === 5)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 2c7270d9f8..3c70b3f092 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -23,7 +23,7 @@ import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
-import scala.reflect.runtime.universe.TypeTag
+import scala.reflect.runtime.universe.{TypeTag, typeTag}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
@@ -35,8 +35,9 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{OverrideFunctionRegistry, Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.ExtractPythonUdfs
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
@@ -155,10 +156,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
}
+ // Note that HiveUDFs will be overridden by functions registered in this context.
+ override protected[sql] lazy val functionRegistry =
+ new HiveFunctionRegistry with OverrideFunctionRegistry
+
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
- new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false)
+ new Analyzer(catalog, functionRegistry, caseSensitive = false)
/**
* Runs the specified SQL query using Hive.
@@ -250,7 +255,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[sql] abstract class QueryExecution extends super.QueryExecution {
// TODO: Create mixin for the analyzer instead of overriding things here.
override lazy val optimizedPlan =
- optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))
+ optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))
override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 728452a25a..c605e8adcf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -297,8 +297,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
def reset() {
try {
// HACK: Hive is too noisy by default.
- org.apache.log4j.LogManager.getCurrentLoggers.foreach { logger =>
- logger.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN)
+ org.apache.log4j.LogManager.getCurrentLoggers.foreach { log =>
+ log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN)
}
// It is important that we RESET first as broken hooks that might have been set could break
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index d181921269..179aac5cbd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -34,7 +34,8 @@ import org.apache.spark.util.Utils.getContextOrSparkClassLoader
/* Implicit conversions */
import scala.collection.JavaConversions._
-private[hive] object HiveFunctionRegistry extends analysis.FunctionRegistry with HiveInspectors {
+private[hive] abstract class HiveFunctionRegistry
+ extends analysis.FunctionRegistry with HiveInspectors {
def getFunctionInfo(name: String) = FunctionRegistry.getFunctionInfo(name)
@@ -92,9 +93,8 @@ private[hive] abstract class HiveUdf extends Expression with Logging with HiveFu
}
private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[Expression])
- extends HiveUdf {
+ extends HiveUdf with HiveInspectors {
- import org.apache.spark.sql.hive.HiveFunctionRegistry._
type UDFType = UDF
@transient
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 11d8b1f0a3..95921c3d7a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -51,9 +51,9 @@ class QueryTest extends FunSuite {
fail(
s"""
|Exception thrown while executing query:
- |${rdd.logicalPlan}
+ |${rdd.queryExecution}
|== Exception ==
- |$e
+ |${stackTraceToString(e)}
""".stripMargin)
}