From 89f73f674126bbc1cc101f0f5731b5750f1c90c8 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 10 May 2016 12:32:56 -0700 Subject: [SPARK-14642][SQL] import org.apache.spark.sql.expressions._ breaks udf under functions ## What changes were proposed in this pull request? PR fixes the import issue which breaks udf functions. The following code snippet throws an error ``` scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> import org.apache.spark.sql.expressions._ import org.apache.spark.sql.expressions._ scala> udf((v: String) => v.stripSuffix("-abc")) :30: error: No TypeTag available for String udf((v: String) => v.stripSuffix("-abc")) ``` This PR resolves the issue. ## How was this patch tested? patch tested with unit tests. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Subhobrata Dey Closes #12458 from sbcd90/udfFuncBreak. --- .../spark/sql/Java8DatasetAggregatorSuite.java | 2 +- .../apache/spark/sql/expressions/java/typed.java | 75 ------------------ .../spark/sql/expressions/javalang/typed.java | 75 ++++++++++++++++++ .../apache/spark/sql/expressions/scala/typed.scala | 89 ---------------------- .../spark/sql/expressions/scalalang/typed.scala | 89 ++++++++++++++++++++++ .../sql/sources/JavaDatasetAggregatorSuite.java | 2 +- .../apache/spark/sql/DatasetAggregatorSuite.scala | 2 +- .../org/apache/spark/sql/DatasetBenchmark.scala | 2 +- .../sql/execution/WholeStageCodegenSuite.scala | 2 +- .../sql/streaming/StreamingAggregationSuite.scala | 2 +- 10 files changed, 170 insertions(+), 170 deletions(-) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/expressions/scala/typed.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala diff --git a/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java b/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java index 23abfa3970..1a2aea67d9 100644 --- a/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java +++ b/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java @@ -25,7 +25,7 @@ import scala.Tuple2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.KeyValueGroupedDataset; -import org.apache.spark.sql.expressions.java.typed; +import org.apache.spark.sql.expressions.javalang.typed; /** * Suite that replicates tests in JavaDatasetAggregatorSuite using lambda syntax. diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java deleted file mode 100644 index c7c6e3868f..0000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.expressions.java; - -import org.apache.spark.annotation.Experimental; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.TypedColumn; -import org.apache.spark.sql.execution.aggregate.TypedAverage; -import org.apache.spark.sql.execution.aggregate.TypedCount; -import org.apache.spark.sql.execution.aggregate.TypedSumDouble; -import org.apache.spark.sql.execution.aggregate.TypedSumLong; - -/** - * :: Experimental :: - * Type-safe functions available for {@link org.apache.spark.sql.Dataset} operations in Java. - * - * Scala users should use {@link org.apache.spark.sql.expressions.scala.typed}. - * - * @since 2.0.0 - */ -@Experimental -public class typed { - // Note: make sure to keep in sync with typed.scala - - /** - * Average aggregate function. - * - * @since 2.0.0 - */ - public static TypedColumn avg(MapFunction f) { - return new TypedAverage(f).toColumnJava(); - } - - /** - * Count aggregate function. - * - * @since 2.0.0 - */ - public static TypedColumn count(MapFunction f) { - return new TypedCount(f).toColumnJava(); - } - - /** - * Sum aggregate function for floating point (double) type. - * - * @since 2.0.0 - */ - public static TypedColumn sum(MapFunction f) { - return new TypedSumDouble(f).toColumnJava(); - } - - /** - * Sum aggregate function for integral (long, i.e. 64 bit integer) type. - * - * @since 2.0.0 - */ - public static TypedColumn sumLong(MapFunction f) { - return new TypedSumLong(f).toColumnJava(); - } -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java new file mode 100644 index 0000000000..247e94b86c --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.expressions.javalang; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.TypedColumn; +import org.apache.spark.sql.execution.aggregate.TypedAverage; +import org.apache.spark.sql.execution.aggregate.TypedCount; +import org.apache.spark.sql.execution.aggregate.TypedSumDouble; +import org.apache.spark.sql.execution.aggregate.TypedSumLong; + +/** + * :: Experimental :: + * Type-safe functions available for {@link org.apache.spark.sql.Dataset} operations in Java. + * + * Scala users should use {@link org.apache.spark.sql.expressions.scalalang.typed}. + * + * @since 2.0.0 + */ +@Experimental +public class typed { + // Note: make sure to keep in sync with typed.scala + + /** + * Average aggregate function. + * + * @since 2.0.0 + */ + public static TypedColumn avg(MapFunction f) { + return new TypedAverage(f).toColumnJava(); + } + + /** + * Count aggregate function. + * + * @since 2.0.0 + */ + public static TypedColumn count(MapFunction f) { + return new TypedCount(f).toColumnJava(); + } + + /** + * Sum aggregate function for floating point (double) type. + * + * @since 2.0.0 + */ + public static TypedColumn sum(MapFunction f) { + return new TypedSumDouble(f).toColumnJava(); + } + + /** + * Sum aggregate function for integral (long, i.e. 64 bit integer) type. + * + * @since 2.0.0 + */ + public static TypedColumn sumLong(MapFunction f) { + return new TypedSumLong(f).toColumnJava(); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scala/typed.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scala/typed.scala deleted file mode 100644 index d0eb190afd..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scala/typed.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.expressions.scala - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.aggregate._ - -/** - * :: Experimental :: - * Type-safe functions available for [[Dataset]] operations in Scala. - * - * Java users should use [[org.apache.spark.sql.expressions.java.typed]]. - * - * @since 2.0.0 - */ -@Experimental -// scalastyle:off -object typed { - // scalastyle:on - - // Note: whenever we update this file, we should update the corresponding Java version too. - // The reason we have separate files for Java and Scala is because in the Scala version, we can - // use tighter types (primitive types) for return types, whereas in the Java version we can only - // use boxed primitive types. - // For example, avg in the Scala veresion returns Scala primitive Double, whose bytecode - // signature is just a java.lang.Object; avg in the Java version returns java.lang.Double. - - // TODO: This is pretty hacky. Maybe we should have an object for implicit encoders. - private val implicits = new SQLImplicits { - override protected def _sqlContext: SQLContext = null - } - - import implicits._ - - /** - * Average aggregate function. - * - * @since 2.0.0 - */ - def avg[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedAverage(f).toColumn - - /** - * Count aggregate function. - * - * @since 2.0.0 - */ - def count[IN](f: IN => Any): TypedColumn[IN, Long] = new TypedCount(f).toColumn - - /** - * Sum aggregate function for floating point (double) type. - * - * @since 2.0.0 - */ - def sum[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedSumDouble[IN](f).toColumn - - /** - * Sum aggregate function for integral (long, i.e. 64 bit integer) type. - * - * @since 2.0.0 - */ - def sumLong[IN](f: IN => Long): TypedColumn[IN, Long] = new TypedSumLong[IN](f).toColumn - - // TODO: - // stddevOf: Double - // varianceOf: Double - // approxCountDistinct: Long - - // minOf: T - // maxOf: T - - // firstOf: T - // lastOf: T -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala new file mode 100644 index 0000000000..f46a4a7879 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.expressions.scalalang + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.aggregate._ + +/** + * :: Experimental :: + * Type-safe functions available for [[Dataset]] operations in Scala. + * + * Java users should use [[org.apache.spark.sql.expressions.javalang.typed]]. + * + * @since 2.0.0 + */ +@Experimental +// scalastyle:off +object typed { + // scalastyle:on + + // Note: whenever we update this file, we should update the corresponding Java version too. + // The reason we have separate files for Java and Scala is because in the Scala version, we can + // use tighter types (primitive types) for return types, whereas in the Java version we can only + // use boxed primitive types. + // For example, avg in the Scala veresion returns Scala primitive Double, whose bytecode + // signature is just a java.lang.Object; avg in the Java version returns java.lang.Double. + + // TODO: This is pretty hacky. Maybe we should have an object for implicit encoders. + private val implicits = new SQLImplicits { + override protected def _sqlContext: SQLContext = null + } + + import implicits._ + + /** + * Average aggregate function. + * + * @since 2.0.0 + */ + def avg[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedAverage(f).toColumn + + /** + * Count aggregate function. + * + * @since 2.0.0 + */ + def count[IN](f: IN => Any): TypedColumn[IN, Long] = new TypedCount(f).toColumn + + /** + * Sum aggregate function for floating point (double) type. + * + * @since 2.0.0 + */ + def sum[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedSumDouble[IN](f).toColumn + + /** + * Sum aggregate function for integral (long, i.e. 64 bit integer) type. + * + * @since 2.0.0 + */ + def sumLong[IN](f: IN => Long): TypedColumn[IN, Long] = new TypedSumLong[IN](f).toColumn + + // TODO: + // stddevOf: Double + // varianceOf: Double + // approxCountDistinct: Long + + // minOf: T + // maxOf: T + + // firstOf: T + // lastOf: T +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java index 0e49f871de..f9842e130b 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java @@ -30,7 +30,7 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.KeyValueGroupedDataset; import org.apache.spark.sql.expressions.Aggregator; -import org.apache.spark.sql.expressions.java.typed; +import org.apache.spark.sql.expressions.javalang.typed; /** * Suite for testing the aggregate functionality of Datasets in Java. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index b2a0f3d67e..f1585ca3ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -21,7 +21,7 @@ import scala.language.postfixOps import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.sql.expressions.scala.typed +import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala index d8e241c62f..4101e5c75b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.sql.expressions.scala.typed +import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StringType import org.apache.spark.util.Benchmark diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index ada60f6919..f86955e5a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Row import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec -import org.apache.spark.sql.expressions.scala.typed +import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions.{avg, broadcast, col, max} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructType} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 8da7742ffe..0f5fc9ca72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.StreamTest import org.apache.spark.sql.catalyst.analysis.Update import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore -import org.apache.spark.sql.expressions.scala.typed +import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext -- cgit v1.2.3