From 8265dc7739caccc59bc2456b2df055ca96337fe4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 23 Mar 2014 15:21:40 -0700 Subject: Fixed coding style issues in Spark SQL This PR addresses various coding style issues in Spark SQL, including but not limited to those mentioned by @mateiz in PR #146. As this PR affects lots of source files and may cause potential conflicts, it would be better to merge this as soon as possible *after* PR #205 (In-memory columnar representation for Spark SQL) is merged. Author: Cheng Lian Closes #208 from liancheng/fixCodingStyle and squashes the following commits: fc2b528 [Cheng Lian] Merge branch 'master' into fixCodingStyle b531273 [Cheng Lian] Fixed coding style issues in sql/hive 0b56f77 [Cheng Lian] Fixed coding style issues in sql/core fae7b02 [Cheng Lian] Addressed styling issues mentioned by @marmbrus 9265366 [Cheng Lian] Fixed coding style issues in sql/core 3dcbbbd [Cheng Lian] Fixed relative package imports for package catalyst --- .../spark/sql/catalyst/ScalaReflection.scala | 69 +++++++++++++ .../org/apache/spark/sql/catalyst/SqlParser.scala | 28 +++-- .../spark/sql/catalyst/analysis/Analyzer.scala | 7 +- .../spark/sql/catalyst/analysis/Catalog.scala | 3 +- .../sql/catalyst/analysis/FunctionRegistry.scala | 2 +- .../sql/catalyst/analysis/HiveTypeCoercion.scala | 8 +- .../catalyst/analysis/MultiInstanceRelation.scala | 8 +- .../spark/sql/catalyst/analysis/package.scala | 1 + .../spark/sql/catalyst/analysis/unresolved.scala | 8 +- .../apache/spark/sql/catalyst/dsl/package.scala | 56 +--------- .../apache/spark/sql/catalyst/errors/package.scala | 7 +- .../sql/catalyst/expressions/BoundAttribute.scala | 7 +- .../spark/sql/catalyst/expressions/Cast.scala | 4 +- .../sql/catalyst/expressions/Expression.scala | 6 +- .../spark/sql/catalyst/expressions/Rand.scala | 2 +- .../spark/sql/catalyst/expressions/Row.scala | 2 +- .../spark/sql/catalyst/expressions/ScalaUdf.scala | 2 +- .../sql/catalyst/expressions/WrapDynamic.scala | 2 +- .../sql/catalyst/expressions/aggregates.scala | 2 +- .../sql/catalyst/expressions/arithmetic.scala | 4 +- .../sql/catalyst/expressions/complexTypes.scala | 2 +- .../sql/catalyst/expressions/generators.scala | 2 +- .../spark/sql/catalyst/expressions/literals.scala | 2 +- .../catalyst/expressions/namedExpressions.scala | 6 +- .../sql/catalyst/expressions/nullFunctions.scala | 2 +- .../sql/catalyst/expressions/predicates.scala | 4 +- .../catalyst/expressions/stringOperations.scala | 2 +- .../spark/sql/catalyst/optimizer/Optimizer.scala | 25 ++--- .../spark/sql/catalyst/planning/QueryPlanner.scala | 5 +- .../spark/sql/catalyst/planning/patterns.scala | 4 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 4 +- .../sql/catalyst/plans/logical/LogicalPlan.scala | 6 +- .../plans/logical/ScriptTransformation.scala | 2 +- .../sql/catalyst/plans/logical/TestRelation.scala | 3 +- .../catalyst/plans/logical/basicOperators.scala | 2 +- .../sql/catalyst/plans/logical/partitioning.scala | 2 +- .../sql/catalyst/plans/physical/partitioning.scala | 4 +- .../org/apache/spark/sql/catalyst/rules/Rule.scala | 2 +- .../spark/sql/catalyst/rules/RuleExecutor.scala | 14 +-- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- .../apache/spark/sql/catalyst/AnalysisSuite.scala | 41 -------- .../sql/catalyst/ExpressionEvaluationSuite.scala | 115 --------------------- .../spark/sql/catalyst/HiveTypeCoercionSuite.scala | 74 ------------- .../spark/sql/catalyst/RuleExecutorSuite.scala | 57 ---------- .../apache/spark/sql/catalyst/TreeNodeSuite.scala | 81 --------------- .../sql/catalyst/analysis/AnalysisSuite.scala | 39 +++++++ .../catalyst/analysis/HiveTypeCoercionSuite.scala | 73 +++++++++++++ .../expressions/ExpressionEvaluationSuite.scala | 114 ++++++++++++++++++++ .../catalyst/optimizer/ConstantFoldingSuite.scala | 19 ++-- .../catalyst/optimizer/FilterPushdownSuite.scala | 13 ++- .../sql/catalyst/optimizer/OptimizerTest.scala | 12 +-- .../sql/catalyst/trees/RuleExecutorSuite.scala | 57 ++++++++++ .../spark/sql/catalyst/trees/TreeNodeSuite.scala | 78 ++++++++++++++ .../spark/rdd/PartitionLocalRDDFunctions.scala | 7 +- .../scala/org/apache/spark/sql/SQLContext.scala | 5 +- .../scala/org/apache/spark/sql/SchemaRDD.scala | 4 +- .../org/apache/spark/sql/execution/Exchange.scala | 21 ++-- .../org/apache/spark/sql/execution/Generate.scala | 3 +- .../spark/sql/execution/SparkStrategies.scala | 16 +-- .../apache/spark/sql/execution/aggregates.scala | 9 +- .../spark/sql/execution/basicOperators.scala | 13 ++- .../org/apache/spark/sql/execution/joins.scala | 8 +- .../org/apache/spark/sql/execution/package.scala | 3 +- .../apache/spark/sql/parquet/ParquetRelation.scala | 39 ++++--- .../spark/sql/parquet/ParquetTableOperations.scala | 28 ++--- .../spark/sql/parquet/ParquetTableSupport.scala | 20 ++-- .../apache/spark/sql/parquet/ParquetTestData.scala | 11 +- .../scala/org/apache/spark/sql/DslQuerySuite.scala | 7 +- .../scala/org/apache/spark/sql/PlannerSuite.scala | 62 ----------- .../scala/org/apache/spark/sql/QueryTest.scala | 8 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 26 ++--- .../test/scala/org/apache/spark/sql/TestData.scala | 14 +-- .../test/scala/org/apache/spark/sql/TgfSuite.scala | 71 ------------- .../apache/spark/sql/execution/PlannerSuite.scala | 60 +++++++++++ .../org/apache/spark/sql/execution/TgfSuite.scala | 66 ++++++++++++ .../spark/sql/parquet/ParquetQuerySuite.scala | 15 ++- .../apache/hadoop/mapred/SparkHadoopWriter.scala | 7 +- .../org/apache/spark/sql/hive/HiveContext.scala | 25 ++--- .../spark/sql/hive/HiveMetastoreCatalog.scala | 22 ++-- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 25 ++--- .../org/apache/spark/sql/hive/HiveStrategies.scala | 11 +- .../spark/sql/hive/ScriptTransformation.scala | 5 +- .../org/apache/spark/sql/hive/TableReader.scala | 26 +++-- .../scala/org/apache/spark/sql/hive/TestHive.scala | 21 ++-- .../org/apache/spark/sql/hive/hiveOperators.scala | 28 ++--- .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 22 ++-- .../sql/hive/execution/ConcurrentHiveSuite.scala | 3 +- .../sql/hive/execution/HiveComparisonTest.scala | 12 +-- .../hive/execution/HiveCompatibilitySuite.scala | 5 - .../sql/hive/execution/HiveQueryFileTest.scala | 6 +- .../spark/sql/hive/execution/HiveQuerySuite.scala | 3 +- .../sql/hive/execution/HiveResolutionSuite.scala | 4 +- .../spark/sql/hive/execution/PruningSuite.scala | 5 +- .../spark/sql/parquet/HiveParquetSuite.scala | 16 +-- 94 files changed, 908 insertions(+), 950 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/AnalysisSuite.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ExpressionEvaluationSuite.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/HiveTypeCoercionSuite.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/RuleExecutorSuite.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/TreeNodeSuite.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/PlannerSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/TgfSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala new file mode 100644 index 0000000000..bf7318d2e0 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package catalyst + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.types._ + +/** + * Provides experimental support for generating catalyst schemas for scala objects. + */ +object ScalaReflection { + import scala.reflect.runtime.universe._ + + /** Returns a Sequence of attributes for the given case class type. */ + def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match { + case s: StructType => + s.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)()) + } + + /** Returns a catalyst DataType for the given Scala Type using reflection. */ + def schemaFor[T: TypeTag]: DataType = schemaFor(typeOf[T]) + + /** Returns a catalyst DataType for the given Scala Type using reflection. */ + def schemaFor(tpe: `Type`): DataType = tpe match { + case t if t <:< typeOf[Product] => + val params = t.member("": TermName).asMethod.paramss + StructType( + params.head.map(p => StructField(p.name.toString, schemaFor(p.typeSignature), true))) + case t if t <:< typeOf[Seq[_]] => + val TypeRef(_, _, Seq(elementType)) = t + ArrayType(schemaFor(elementType)) + case t if t <:< typeOf[String] => StringType + case t if t <:< definitions.IntTpe => IntegerType + case t if t <:< definitions.LongTpe => LongType + case t if t <:< definitions.DoubleTpe => DoubleType + case t if t <:< definitions.ShortTpe => ShortType + case t if t <:< definitions.ByteTpe => ByteType + } + + implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) { + + /** + * Implicitly added to Sequences of case class objects. Returns a catalyst logical relation + * for the the data in the sequence. + */ + def asRelation: LocalRelation = { + val output = attributesFor[A] + LocalRelation(output, data) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 919bf4dbc8..9dec4e3d9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -17,21 +17,18 @@ package org.apache.spark.sql.catalyst -import scala.util.matching.Regex -import scala.util.parsing.combinator._ +import scala.util.parsing.combinator.lexical.StdLexical +import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.input.CharArrayReader.EofCh -import lexical._ -import syntactical._ -import token._ -import analysis._ -import expressions._ -import plans._ -import plans.logical._ -import types._ +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.types._ /** - * A very simple SQL parser. Based loosly on: + * A very simple SQL parser. Based loosely on: * https://github.com/stephentu/scala-sql-parser/blob/master/src/main/scala/parser.scala * * Limitations: @@ -39,10 +36,9 @@ import types._ * - Keywords must be capital. * * This is currently included mostly for illustrative purposes. Users wanting more complete support - * for a SQL like language should checkout the HiveQL support in the sql/hive subproject. + * for a SQL like language should checkout the HiveQL support in the sql/hive sub-project. */ class SqlParser extends StandardTokenParsers { - def apply(input: String): LogicalPlan = { phrase(query)(new lexical.Scanner(input)) match { case Success(r, x) => r @@ -196,7 +192,7 @@ class SqlParser extends StandardTokenParsers { protected lazy val from: Parser[LogicalPlan] = FROM ~> relations - // Based very loosly on the MySQL Grammar. + // Based very loosely on the MySQL Grammar. // http://dev.mysql.com/doc/refman/5.0/en/join.html protected lazy val relations: Parser[LogicalPlan] = relation ~ "," ~ relation ^^ { case r1 ~ _ ~ r2 => Join(r1, r2, Inner, None) } | @@ -261,9 +257,9 @@ class SqlParser extends StandardTokenParsers { andExpression * (OR ^^^ { (e1: Expression, e2: Expression) => Or(e1,e2) }) protected lazy val andExpression: Parser[Expression] = - comparisionExpression * (AND ^^^ { (e1: Expression, e2: Expression) => And(e1,e2) }) + comparisonExpression * (AND ^^^ { (e1: Expression, e2: Expression) => And(e1,e2) }) - protected lazy val comparisionExpression: Parser[Expression] = + protected lazy val comparisonExpression: Parser[Expression] = termExpression ~ "=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Equals(e1, e2) } | termExpression ~ "<" ~ termExpression ^^ { case e1 ~ _ ~ e2 => LessThan(e1, e2) } | termExpression ~ "<=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => LessThanOrEqual(e1, e2) } | diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9eb992ee58..fc76e76617 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql package catalyst package analysis -import expressions._ -import plans.logical._ -import rules._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + /** * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 71e4dcdb15..b77f0bbb2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql package catalyst package analysis -import plans.logical.{LogicalPlan, Subquery} import scala.collection.mutable +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} + /** * An interface for looking up relations by name. Used by an [[Analyzer]]. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index a359eb5411..eed058d3c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package analysis -import expressions._ +import org.apache.spark.sql.catalyst.expressions.Expression /** A catalog for looking up user defined functions, used by an [[Analyzer]]. */ trait FunctionRegistry { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index a0105cd7cf..a6ecf6e2eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql package catalyst package analysis -import expressions._ -import plans.logical._ -import rules._ -import types._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.types._ /** * A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala index fe18cc466f..3cad3a5d4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala @@ -18,14 +18,14 @@ package org.apache.spark.sql.catalyst package analysis -import plans.logical.LogicalPlan -import rules._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** * A trait that should be mixed into query operators where an single instance might appear multiple * times in a logical query plan. It is invalid to have multiple copies of the same attribute - * produced by distinct operators in a query tree as this breaks the gurantee that expression - * ids, which are used to differentate attributes, are unique. + * produced by distinct operators in a query tree as this breaks the guarantee that expression + * ids, which are used to differentiate attributes, are unique. * * Before analysis, all operators that include this trait will be asked to produce a new version * of itself with globally unique expression ids. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala index 375c99f48e..30c55bacc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala @@ -15,6 +15,7 @@ * limitations under the License. */ +package org.apache.spark.sql package catalyst /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 2ed2af1352..04ae481102 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql package catalyst package analysis -import expressions._ -import plans.logical.BaseRelation -import trees.TreeNode +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.BaseRelation +import org.apache.spark.sql.catalyst.trees.TreeNode /** * Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully @@ -95,7 +95,7 @@ case class Star( // If there is no table specified, use all input attributes. case None => input // If there is a table, pick out attributes that are part of this table. - case Some(table) => input.filter(_.qualifiers contains table) + case Some(t) => input.filter(_.qualifiers contains t) } val mappedAttributes = expandedAttributes.map(mapFunction).zip(input).map { case (n: NamedExpression, _) => n diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index cd8de9d52f..e6255bcafa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -19,58 +19,12 @@ package org.apache.spark.sql package catalyst import scala.language.implicitConversions -import scala.reflect.runtime.universe.TypeTag -import analysis.UnresolvedAttribute -import expressions._ -import plans._ -import plans.logical._ -import types._ - -/** - * Provides experimental support for generating catalyst schemas for scala objects. - */ -object ScalaReflection { - import scala.reflect.runtime.universe._ - - /** Returns a Sequence of attributes for the given case class type. */ - def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match { - case s: StructType => - s.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)()) - } - - /** Returns a catalyst DataType for the given Scala Type using reflection. */ - def schemaFor[T: TypeTag]: DataType = schemaFor(typeOf[T]) - - /** Returns a catalyst DataType for the given Scala Type using reflection. */ - def schemaFor(tpe: `Type`): DataType = tpe match { - case t if t <:< typeOf[Product] => - val params = t.member("": TermName).asMethod.paramss - StructType( - params.head.map(p => StructField(p.name.toString, schemaFor(p.typeSignature), true))) - case t if t <:< typeOf[Seq[_]] => - val TypeRef(_, _, Seq(elementType)) = t - ArrayType(schemaFor(elementType)) - case t if t <:< typeOf[String] => StringType - case t if t <:< definitions.IntTpe => IntegerType - case t if t <:< definitions.LongTpe => LongType - case t if t <:< definitions.DoubleTpe => DoubleType - case t if t <:< definitions.ShortTpe => ShortType - case t if t <:< definitions.ByteTpe => ByteType - } - - implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) { - - /** - * Implicitly added to Sequences of case class objects. Returns a catalyst logical relation - * for the the data in the sequence. - */ - def asRelation: LocalRelation = { - val output = attributesFor[A] - LocalRelation(output, data) - } - } -} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} +import org.apache.spark.sql.catalyst.types._ /** * A collection of implicit conversions that create a DSL for constructing catalyst data structures. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala index c253587f67..d8b69946fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala @@ -18,15 +18,16 @@ package org.apache.spark.sql package catalyst -import trees._ +import org.apache.spark.sql.catalyst.trees.TreeNode /** * Functions for attaching and retrieving trees that are associated with errors. */ package object errors { - class TreeNodeException[TreeType <: TreeNode[_]] - (tree: TreeType, msg: String, cause: Throwable) extends Exception(msg, cause) { + class TreeNodeException[TreeType <: TreeNode[_]]( + tree: TreeType, msg: String, cause: Throwable) + extends Exception(msg, cause) { // Yes, this is the same as a default parameter, but... those don't seem to work with SBT // external project dependencies for some reason. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 3b6bac16ff..3fa4148f0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql package catalyst package expressions -import rules._ -import errors._ - -import catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.errors.attachTree +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.rules.Rule /** * A bound reference points to a specific slot in the input tuple, allowing the actual value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 608656d3a9..71f64ef950 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import types._ +import org.apache.spark.sql.catalyst.types._ /** Cast the child expression to the target data type. */ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { @@ -40,7 +40,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case (StringType, ShortType) => a: Any => castOrNull(a, _.toShort) case (StringType, ByteType) => a: Any => castOrNull(a, _.toByte) case (StringType, DecimalType) => a: Any => castOrNull(a, BigDecimal(_)) - case (BooleanType, ByteType) => a: Any => a match { + case (BooleanType, ByteType) => { case null => null case true => 1.toByte case false => 0.toByte diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 78aaaeebbd..2454a3355b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql package catalyst package expressions -import errors._ -import trees._ -import types._ +import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.types.{DataType, FractionalType, IntegralType, NumericType} +import org.apache.spark.sql.catalyst.errors.TreeNodeException abstract class Expression extends TreeNode[Expression] { self: Product => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala index a5d0ecf964..0d173afec8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import types.DoubleType +import org.apache.spark.sql.catalyst.types.DoubleType case object Rand extends LeafExpression { def dataType = DoubleType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala index 3529675468..79c91ebaa4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import types._ +import org.apache.spark.sql.catalyst.types.NativeType /** * Represents one row of output from a relational operator. Allows both generic access by ordinal, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala index a3c7ca1acd..cc33948055 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import types._ +import org.apache.spark.sql.catalyst.types.DataType case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression]) extends Expression { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala index 2ad8d6f31d..01b7a14d4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala @@ -21,7 +21,7 @@ package expressions import scala.language.dynamics -import types._ +import org.apache.spark.sql.catalyst.types.DataType case object DynamicType extends DataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 2287a849e6..a16bb80df3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import catalyst.types._ +import org.apache.spark.sql.catalyst.types._ abstract class AggregateExpression extends Expression { self: Product => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index db235645cd..81e4a487bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql package catalyst package expressions -import catalyst.analysis.UnresolvedException -import catalyst.types._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.types._ case class UnaryMinus(child: Expression) extends UnaryExpression { type EvaluatedType = Any diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index d3feb6c461..9ec0f6ade7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import types._ +import org.apache.spark.sql.catalyst.types._ /** * Returns the item at `ordinal` in the Array `child` or the Key `ordinal` in Map `child`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index c367de2a3e..9097c635ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import catalyst.types._ +import org.apache.spark.sql.catalyst.types._ /** * An expression that produces zero or more rows given a single input row. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 229d8f7f7b..0d01312c71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import types._ +import org.apache.spark.sql.catalyst.types._ object Literal { def apply(v: Any): Literal = v match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 0a06e85325..47b1241e71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql package catalyst package expressions -import catalyst.analysis.UnresolvedAttribute -import types._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.types._ object NamedExpression { private val curId = new java.util.concurrent.atomic.AtomicLong() @@ -30,7 +30,7 @@ object NamedExpression { /** * A globally (within this JVM) id for a given named expression. * Used to identify with attribute output by a relation is being - * referenced in a subsuqent computation. + * referenced in a subsequent computation. */ case class ExprId(id: Long) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala index e869a4d9b0..38e38371b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.analysis.UnresolvedException case class Coalesce(children: Seq[Expression]) extends Expression { type EvaluatedType = Any diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 561396eb43..e7f3e8ca60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql package catalyst package expressions -import types._ -import catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.types.{BooleanType, StringType} +import org.apache.spark.sql.catalyst.analysis.UnresolvedException trait Predicate extends Expression { self: Product => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala index 6e585236b1..7584fe03cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package expressions -import catalyst.types.BooleanType +import org.apache.spark.sql.catalyst.types.BooleanType case class Like(left: Expression, right: Expression) extends BinaryExpression { def dataType = BooleanType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4db2803173..c1201971d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql package catalyst package optimizer -import catalyst.expressions._ -import catalyst.plans.logical._ -import catalyst.rules._ -import catalyst.types.BooleanType -import catalyst.plans.Inner +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = @@ -73,7 +73,7 @@ object ConstantFolding extends Rule[LogicalPlan] { object BooleanSimplification extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsUp { - case and @ And(left, right) => { + case and @ And(left, right) => (left, right) match { case (Literal(true, BooleanType), r) => r case (l, Literal(true, BooleanType)) => l @@ -81,8 +81,8 @@ object BooleanSimplification extends Rule[LogicalPlan] { case (_, Literal(false, BooleanType)) => Literal(false) case (_, _) => and } - } - case or @ Or(left, right) => { + + case or @ Or(left, right) => (left, right) match { case (Literal(true, BooleanType), _) => Literal(true) case (_, Literal(true, BooleanType)) => Literal(true) @@ -90,7 +90,6 @@ object BooleanSimplification extends Rule[LogicalPlan] { case (l, Literal(false, BooleanType)) => l case (_, _) => or } - } } } } @@ -101,7 +100,7 @@ object BooleanSimplification extends Rule[LogicalPlan] { */ object CombineFilters extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case ff@Filter(fc, nf@Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild) + case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild) } } @@ -114,8 +113,10 @@ object CombineFilters extends Rule[LogicalPlan] { */ object PushPredicateThroughProject extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case filter@Filter(condition, project@Project(fields, grandChild)) => - val sourceAliases = fields.collect { case a@Alias(c, _) => a.toAttribute -> c }.toMap + case filter @ Filter(condition, project @ Project(fields, grandChild)) => + val sourceAliases = fields.collect { case a @ Alias(c, _) => + (a.toAttribute: Attribute) -> c + }.toMap project.copy(child = filter.copy( replaceAlias(condition, sourceAliases), grandChild)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 22f8ea005b..d50b963dfc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql package catalyst package planning - -import plans.logical.LogicalPlan -import trees._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreeNode /** * Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 613b028ca8..ff0ea90e54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -21,8 +21,8 @@ package planning import scala.annotation.tailrec -import expressions._ -import plans.logical._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ /** * A pattern that matches any number of filter operations on top of another relational operator. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 20f230c5c4..848db2452a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql package catalyst package plans -import catalyst.expressions.{SortOrder, Attribute, Expression} -import catalyst.trees._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.trees.TreeNode abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] { self: PlanType with Product => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index bc7b6871df..225dd260fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -20,9 +20,9 @@ package catalyst package plans package logical -import catalyst.expressions._ -import catalyst.errors._ -import catalyst.types.StructType +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types.StructType abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala index 1a1a2b9b88..5a3ea9f0a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala @@ -20,7 +20,7 @@ package catalyst package plans package logical -import expressions._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} /** * Transforms the input by forking and running the specified script. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala index b5905a4456..ac7d2d6001 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala @@ -20,8 +20,7 @@ package catalyst package plans package logical -import expressions._ -import rules._ +import org.apache.spark.sql.catalyst.expressions.Attribute object LocalRelation { def apply(output: Attribute*) = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 8e98aab736..6480cca300 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -20,7 +20,7 @@ package catalyst package plans package logical -import expressions._ +import org.apache.spark.sql.catalyst.expressions._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { def output = projectList.map(_.toAttribute) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala index f7fcdc5fdb..775e50bbd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala @@ -20,7 +20,7 @@ package catalyst package plans package logical -import expressions._ +import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder} /** * Performs a physical redistribution of the data. Used when the consumer of the query diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 2d8f3ad335..20e2a45678 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -20,8 +20,8 @@ package catalyst package plans package physical -import expressions._ -import types._ +import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder} +import org.apache.spark.sql.catalyst.types.IntegerType /** * Specifies how tuples that share common expressions will be distributed when a query is executed diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala index 6ff4891a3f..c7632a62a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package rules -import trees._ +import org.apache.spark.sql.catalyst.trees.TreeNode abstract class Rule[TreeType <: TreeNode[_]] extends Logging { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 68ae30cde1..9db96f89dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql package catalyst package rules -import trees._ -import util._ +import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.util.sideBySide abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { @@ -52,19 +52,19 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { batches.foreach { batch => var iteration = 1 var lastPlan = curPlan - curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => rule(curPlan) } + curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => rule(plan) } // Run until fix point (or the max number of iterations as specified in the strategy. while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) { lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { - case (curPlan, rule) => - val result = rule(curPlan) - if (!result.fastEquals(curPlan)) { + case (plan, rule) => + val result = rule(plan) + if (!result.fastEquals(plan)) { logger.debug( s""" |=== Applying Rule ${rule.ruleName} === - |${sideBySide(curPlan.treeString, result.treeString).mkString("\n")} + |${sideBySide(plan.treeString, result.treeString).mkString("\n")} """.stripMargin) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 37e557441d..89e27d81da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql package catalyst package trees -import errors._ +import org.apache.spark.sql.catalyst.errors._ object TreeNode { private val currentId = new java.util.concurrent.atomic.AtomicLong diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/AnalysisSuite.scala deleted file mode 100644 index 1fd0d26b6f..0000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/AnalysisSuite.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql -package catalyst -package analysis - -import org.scalatest.FunSuite - -import analysis._ -import expressions._ -import plans.logical._ -import types._ - -import dsl._ -import dsl.expressions._ - -class AnalysisSuite extends FunSuite { - val analyze = SimpleAnalyzer - - val testRelation = LocalRelation('a.int) - - test("analyze project") { - assert(analyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) === Project(testRelation.output, testRelation)) - - } -} \ No newline at end of file diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ExpressionEvaluationSuite.scala deleted file mode 100644 index f06618ad11..0000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ExpressionEvaluationSuite.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql -package catalyst -package expressions - -import org.scalatest.FunSuite - -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.types._ - -/* Implict conversions */ -import org.apache.spark.sql.catalyst.dsl.expressions._ - -class ExpressionEvaluationSuite extends FunSuite { - - test("literals") { - assert((Literal(1) + Literal(1)).apply(null) === 2) - } - - /** - * Checks for three-valued-logic. Based on: - * http://en.wikipedia.org/wiki/Null_(SQL)#Comparisons_with_NULL_and_the_three-valued_logic_.283VL.29 - * - * p q p OR q p AND q p = q - * True True True True True - * True False True False False - * True Unknown True Unknown Unknown - * False True True False False - * False False False False True - * False Unknown Unknown False Unknown - * Unknown True True Unknown Unknown - * Unknown False Unknown False Unknown - * Unknown Unknown Unknown Unknown Unknown - * - * p NOT p - * True False - * False True - * Unknown Unknown - */ - - val notTrueTable = - (true, false) :: - (false, true) :: - (null, null) :: Nil - - test("3VL Not") { - notTrueTable.foreach { - case (v, answer) => - val expr = Not(Literal(v, BooleanType)) - val result = expr.apply(null) - if (result != answer) - fail(s"$expr should not evaluate to $result, expected: $answer") } - } - - booleanLogicTest("AND", _ && _, - (true, true, true) :: - (true, false, false) :: - (true, null, null) :: - (false, true, false) :: - (false, false, false) :: - (false, null, false) :: - (null, true, null) :: - (null, false, false) :: - (null, null, null) :: Nil) - - booleanLogicTest("OR", _ || _, - (true, true, true) :: - (true, false, true) :: - (true, null, true) :: - (false, true, true) :: - (false, false, false) :: - (false, null, null) :: - (null, true, true) :: - (null, false, null) :: - (null, null, null) :: Nil) - - booleanLogicTest("=", _ === _, - (true, true, true) :: - (true, false, false) :: - (true, null, null) :: - (false, true, false) :: - (false, false, true) :: - (false, null, null) :: - (null, true, null) :: - (null, false, null) :: - (null, null, null) :: Nil) - - def booleanLogicTest(name: String, op: (Expression, Expression) => Expression, truthTable: Seq[(Any, Any, Any)]) { - test(s"3VL $name") { - truthTable.foreach { - case (l,r,answer) => - val expr = op(Literal(l, BooleanType), Literal(r, BooleanType)) - val result = expr.apply(null) - if (result != answer) - fail(s"$expr should not evaluate to $result, expected: $answer") - } - } - } -} \ No newline at end of file diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/HiveTypeCoercionSuite.scala deleted file mode 100644 index f595bf7e44..0000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/HiveTypeCoercionSuite.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql -package catalyst -package analysis - -import org.scalatest.FunSuite - -import catalyst.types._ - - -class HiveTypeCoercionSuite extends FunSuite { - - val rules = new HiveTypeCoercion { } - import rules._ - - test("tightest common bound for numeric and boolean types") { - def widenTest(t1: DataType, t2: DataType, tightestCommon: Option[DataType]) { - var found = WidenTypes.findTightestCommonType(t1, t2) - assert(found == tightestCommon, - s"Expected $tightestCommon as tightest common type for $t1 and $t2, found $found") - // Test both directions to make sure the widening is symmetric. - found = WidenTypes.findTightestCommonType(t2, t1) - assert(found == tightestCommon, - s"Expected $tightestCommon as tightest common type for $t2 and $t1, found $found") - } - - // Boolean - widenTest(NullType, BooleanType, Some(BooleanType)) - widenTest(BooleanType, BooleanType, Some(BooleanType)) - widenTest(IntegerType, BooleanType, None) - widenTest(LongType, BooleanType, None) - - // Integral - widenTest(NullType, ByteType, Some(ByteType)) - widenTest(NullType, IntegerType, Some(IntegerType)) - widenTest(NullType, LongType, Some(LongType)) - widenTest(ShortType, IntegerType, Some(IntegerType)) - widenTest(ShortType, LongType, Some(LongType)) - widenTest(IntegerType, LongType, Some(LongType)) - widenTest(LongType, LongType, Some(LongType)) - - // Floating point - widenTest(NullType, FloatType, Some(FloatType)) - widenTest(NullType, DoubleType, Some(DoubleType)) - widenTest(FloatType, DoubleType, Some(DoubleType)) - widenTest(FloatType, FloatType, Some(FloatType)) - widenTest(DoubleType, DoubleType, Some(DoubleType)) - - // Integral mixed with floating point. - widenTest(NullType, FloatType, Some(FloatType)) - widenTest(NullType, DoubleType, Some(DoubleType)) - widenTest(IntegerType, FloatType, Some(FloatType)) - widenTest(IntegerType, DoubleType, Some(DoubleType)) - widenTest(IntegerType, DoubleType, Some(DoubleType)) - widenTest(LongType, FloatType, Some(FloatType)) - widenTest(LongType, DoubleType, Some(DoubleType)) - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/RuleExecutorSuite.scala deleted file mode 100644 index ff7c15b718..0000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/RuleExecutorSuite.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql -package catalyst -package trees - -import org.scalatest.FunSuite - -import expressions._ -import rules._ - -class RuleExecutorSuite extends FunSuite { - object DecrementLiterals extends Rule[Expression] { - def apply(e: Expression): Expression = e transform { - case IntegerLiteral(i) if i > 0 => Literal(i - 1) - } - } - - test("only once") { - object ApplyOnce extends RuleExecutor[Expression] { - val batches = Batch("once", Once, DecrementLiterals) :: Nil - } - - assert(ApplyOnce(Literal(10)) === Literal(9)) - } - - test("to fixed point") { - object ToFixedPoint extends RuleExecutor[Expression] { - val batches = Batch("fixedPoint", FixedPoint(100), DecrementLiterals) :: Nil - } - - assert(ToFixedPoint(Literal(10)) === Literal(0)) - } - - test("to maxIterations") { - object ToFixedPoint extends RuleExecutor[Expression] { - val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil - } - - assert(ToFixedPoint(Literal(100)) === Literal(90)) - } -} \ No newline at end of file diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/TreeNodeSuite.scala deleted file mode 100644 index 98bb090c29..0000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/TreeNodeSuite.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql -package catalyst -package trees - -import scala.collection.mutable.ArrayBuffer - -import expressions._ - -import org.scalatest.{FunSuite} - -class TreeNodeSuite extends FunSuite { - - test("top node changed") { - val after = Literal(1) transform { case Literal(1, _) => Literal(2) } - assert(after === Literal(2)) - } - - test("one child changed") { - val before = Add(Literal(1), Literal(2)) - val after = before transform { case Literal(2, _) => Literal(1) } - - assert(after === Add(Literal(1), Literal(1))) - } - - test("no change") { - val before = Add(Literal(1), Add(Literal(2), Add(Literal(3), Literal(4)))) - val after = before transform { case Literal(5, _) => Literal(1)} - - assert(before === after) - assert(before.map(_.id) === after.map(_.id)) - } - - test("collect") { - val tree = Add(Literal(1), Add(Literal(2), Add(Literal(3), Literal(4)))) - val literals = tree collect {case l: Literal => l} - - assert(literals.size === 4) - (1 to 4).foreach(i => assert(literals contains Literal(i))) - } - - test("pre-order transform") { - val actual = new ArrayBuffer[String]() - val expected = Seq("+", "1", "*", "2", "-", "3", "4") - val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), Literal(4)))) - expression transformDown { - case b: BinaryExpression => {actual.append(b.symbol); b} - case l: Literal => {actual.append(l.toString); l} - } - - assert(expected === actual) - } - - test("post-order transform") { - val actual = new ArrayBuffer[String]() - val expected = Seq("1", "2", "3", "4", "-", "*", "+") - val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), Literal(4)))) - expression transformUp { - case b: BinaryExpression => {actual.append(b.symbol); b} - case l: Literal => {actual.append(l.toString); l} - } - - assert(expected === actual) - } -} \ No newline at end of file diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala new file mode 100644 index 0000000000..78ec48ba77 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package catalyst +package analysis + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.plans.logical._ + +/* Implicit conversions */ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class AnalysisSuite extends FunSuite { + val analyze = SimpleAnalyzer + + val testRelation = LocalRelation('a.int) + + test("analyze project") { + assert( + analyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) === + Project(testRelation.output, testRelation)) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala new file mode 100644 index 0000000000..b85b72a284 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package catalyst +package analysis + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.types._ + +class HiveTypeCoercionSuite extends FunSuite { + + val rules = new HiveTypeCoercion { } + import rules._ + + test("tightest common bound for numeric and boolean types") { + def widenTest(t1: DataType, t2: DataType, tightestCommon: Option[DataType]) { + var found = WidenTypes.findTightestCommonType(t1, t2) + assert(found == tightestCommon, + s"Expected $tightestCommon as tightest common type for $t1 and $t2, found $found") + // Test both directions to make sure the widening is symmetric. + found = WidenTypes.findTightestCommonType(t2, t1) + assert(found == tightestCommon, + s"Expected $tightestCommon as tightest common type for $t2 and $t1, found $found") + } + + // Boolean + widenTest(NullType, BooleanType, Some(BooleanType)) + widenTest(BooleanType, BooleanType, Some(BooleanType)) + widenTest(IntegerType, BooleanType, None) + widenTest(LongType, BooleanType, None) + + // Integral + widenTest(NullType, ByteType, Some(ByteType)) + widenTest(NullType, IntegerType, Some(IntegerType)) + widenTest(NullType, LongType, Some(LongType)) + widenTest(ShortType, IntegerType, Some(IntegerType)) + widenTest(ShortType, LongType, Some(LongType)) + widenTest(IntegerType, LongType, Some(LongType)) + widenTest(LongType, LongType, Some(LongType)) + + // Floating point + widenTest(NullType, FloatType, Some(FloatType)) + widenTest(NullType, DoubleType, Some(DoubleType)) + widenTest(FloatType, DoubleType, Some(DoubleType)) + widenTest(FloatType, FloatType, Some(FloatType)) + widenTest(DoubleType, DoubleType, Some(DoubleType)) + + // Integral mixed with floating point. + widenTest(NullType, FloatType, Some(FloatType)) + widenTest(NullType, DoubleType, Some(DoubleType)) + widenTest(IntegerType, FloatType, Some(FloatType)) + widenTest(IntegerType, DoubleType, Some(DoubleType)) + widenTest(IntegerType, DoubleType, Some(DoubleType)) + widenTest(LongType, FloatType, Some(FloatType)) + widenTest(LongType, DoubleType, Some(DoubleType)) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala new file mode 100644 index 0000000000..c8fd581aa7 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package catalyst +package expressions + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.types._ + +/* Implicit conversions */ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class ExpressionEvaluationSuite extends FunSuite { + + test("literals") { + assert((Literal(1) + Literal(1)).apply(null) === 2) + } + + /** + * Checks for three-valued-logic. Based on: + * http://en.wikipedia.org/wiki/Null_(SQL)#Comparisons_with_NULL_and_the_three-valued_logic_.283VL.29 + * + * p q p OR q p AND q p = q + * True True True True True + * True False True False False + * True Unknown True Unknown Unknown + * False True True False False + * False False False False True + * False Unknown Unknown False Unknown + * Unknown True True Unknown Unknown + * Unknown False Unknown False Unknown + * Unknown Unknown Unknown Unknown Unknown + * + * p NOT p + * True False + * False True + * Unknown Unknown + */ + + val notTrueTable = + (true, false) :: + (false, true) :: + (null, null) :: Nil + + test("3VL Not") { + notTrueTable.foreach { + case (v, answer) => + val expr = Not(Literal(v, BooleanType)) + val result = expr.apply(null) + if (result != answer) + fail(s"$expr should not evaluate to $result, expected: $answer") } + } + + booleanLogicTest("AND", _ && _, + (true, true, true) :: + (true, false, false) :: + (true, null, null) :: + (false, true, false) :: + (false, false, false) :: + (false, null, false) :: + (null, true, null) :: + (null, false, false) :: + (null, null, null) :: Nil) + + booleanLogicTest("OR", _ || _, + (true, true, true) :: + (true, false, true) :: + (true, null, true) :: + (false, true, true) :: + (false, false, false) :: + (false, null, null) :: + (null, true, true) :: + (null, false, null) :: + (null, null, null) :: Nil) + + booleanLogicTest("=", _ === _, + (true, true, true) :: + (true, false, false) :: + (true, null, null) :: + (false, true, false) :: + (false, false, true) :: + (false, null, null) :: + (null, true, null) :: + (null, false, null) :: + (null, null, null) :: Nil) + + def booleanLogicTest(name: String, op: (Expression, Expression) => Expression, truthTable: Seq[(Any, Any, Any)]) { + test(s"3VL $name") { + truthTable.foreach { + case (l,r,answer) => + val expr = op(Literal(l, BooleanType), Literal(r, BooleanType)) + val result = expr.apply(null) + if (result != answer) + fail(s"$expr should not evaluate to $result, expected: $answer") + } + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index 7ce42b2b0a..2c107b865a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql package catalyst package optimizer -import types.IntegerType -import util._ -import plans.logical.{LogicalPlan, LocalRelation} -import rules._ -import expressions._ -import dsl.plans._ -import dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.types.IntegerType + +// For implicit conversions +import org.apache.spark.sql.catalyst.dsl.expressions._ class ConstantFoldingSuite extends OptimizerTest { @@ -106,7 +107,7 @@ class ConstantFoldingSuite extends OptimizerTest { Literal(5) + 'a as Symbol("c1"), 'a + Literal(2) + Literal(3) as Symbol("c2"), Literal(2) * 'a + Literal(4) as Symbol("c3"), - 'a * (Literal(7)) as Symbol("c4")) + 'a * Literal(7) as Symbol("c4")) .analyze comparePlans(optimized, correctAnswer) @@ -173,4 +174,4 @@ class ConstantFoldingSuite extends OptimizerTest { comparePlans(optimized, correctAnswer) } -} \ No newline at end of file +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index cd611b3fb3..cfbef53de1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -2,13 +2,12 @@ package org.apache.spark.sql package catalyst package optimizer -import expressions._ -import plans.logical._ -import rules._ -import util._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ -import dsl.plans._ -import dsl.expressions._ +/* Implicit conversions */ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ class FilterPushdownSuite extends OptimizerTest { @@ -219,4 +218,4 @@ class FilterPushdownSuite extends OptimizerTest { comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer)) } -} \ No newline at end of file +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala index 7b3653d0f9..8ec1d3d8c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala @@ -4,13 +4,9 @@ package optimizer import org.scalatest.FunSuite -import types.IntegerType -import util._ -import plans.logical.{LogicalPlan, LocalRelation} -import expressions._ -import dsl._ - -/* Implicit conversions for creating query plans */ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util._ /** * Provides helper methods for comparing plans produced by optimization rules with the expected @@ -41,4 +37,4 @@ class OptimizerTest extends FunSuite { |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")} """.stripMargin) } -} \ No newline at end of file +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala new file mode 100644 index 0000000000..738cfa85fb --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package catalyst +package trees + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal} +import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} + +class RuleExecutorSuite extends FunSuite { + object DecrementLiterals extends Rule[Expression] { + def apply(e: Expression): Expression = e transform { + case IntegerLiteral(i) if i > 0 => Literal(i - 1) + } + } + + test("only once") { + object ApplyOnce extends RuleExecutor[Expression] { + val batches = Batch("once", Once, DecrementLiterals) :: Nil + } + + assert(ApplyOnce(Literal(10)) === Literal(9)) + } + + test("to fixed point") { + object ToFixedPoint extends RuleExecutor[Expression] { + val batches = Batch("fixedPoint", FixedPoint(100), DecrementLiterals) :: Nil + } + + assert(ToFixedPoint(Literal(10)) === Literal(0)) + } + + test("to maxIterations") { + object ToFixedPoint extends RuleExecutor[Expression] { + val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil + } + + assert(ToFixedPoint(Literal(100)) === Literal(90)) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala new file mode 100644 index 0000000000..1ddc41a731 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.trees + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.expressions._ + +class TreeNodeSuite extends FunSuite { + test("top node changed") { + val after = Literal(1) transform { case Literal(1, _) => Literal(2) } + assert(after === Literal(2)) + } + + test("one child changed") { + val before = Add(Literal(1), Literal(2)) + val after = before transform { case Literal(2, _) => Literal(1) } + + assert(after === Add(Literal(1), Literal(1))) + } + + test("no change") { + val before = Add(Literal(1), Add(Literal(2), Add(Literal(3), Literal(4)))) + val after = before transform { case Literal(5, _) => Literal(1)} + + assert(before === after) + assert(before.map(_.id) === after.map(_.id)) + } + + test("collect") { + val tree = Add(Literal(1), Add(Literal(2), Add(Literal(3), Literal(4)))) + val literals = tree collect {case l: Literal => l} + + assert(literals.size === 4) + (1 to 4).foreach(i => assert(literals contains Literal(i))) + } + + test("pre-order transform") { + val actual = new ArrayBuffer[String]() + val expected = Seq("+", "1", "*", "2", "-", "3", "4") + val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), Literal(4)))) + expression transformDown { + case b: BinaryExpression => actual.append(b.symbol); b + case l: Literal => actual.append(l.toString); l + } + + assert(expected === actual) + } + + test("post-order transform") { + val actual = new ArrayBuffer[String]() + val expected = Seq("1", "2", "3", "4", "-", "*", "+") + val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), Literal(4)))) + expression transformUp { + case b: BinaryExpression => actual.append(b.symbol); b + case l: Literal => actual.append(l.toString); l + } + + assert(expected === actual) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala b/sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala index b8b9e5839d..f1230e7526 100644 --- a/sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala @@ -22,11 +22,12 @@ import scala.language.implicitConversions import scala.reflect._ import scala.collection.mutable.ArrayBuffer -import org.apache.spark._ -import org.apache.spark.Aggregator -import org.apache.spark.SparkContext._ +import org.apache.spark.{Aggregator, InterruptibleIterator, Logging} import org.apache.spark.util.collection.AppendOnlyMap +/* Implicit conversions */ +import org.apache.spark.SparkContext._ + /** * Extra functions on RDDs that perform only local operations. These can be used when data has * already been partitioned correctly. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 587cc7487f..3e98bd3ca6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -20,14 +20,13 @@ package org.apache.spark.sql import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.planning.QueryPlanner -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand, WriteToFile} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 91c3aaa2b8..770cabcb31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql -import org.apache.spark.{OneToOneDependency, Dependency, Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType +import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} /** * ALPHA COMPONENT diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index e934c4cf69..65d77e3a40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql package execution -import org.apache.spark.{SparkConf, RangePartitioner, HashPartitioner} import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.sql.catalyst.errors.attachTree +import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering} +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair - -import catalyst.rules.Rule -import catalyst.errors._ -import catalyst.expressions._ -import catalyst.plans.physical._ +import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf} case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { @@ -35,7 +34,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una def execute() = attachTree(this , "execute") { newPartitioning match { - case HashPartitioning(expressions, numPartitions) => { + case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. val rdd = child.execute().mapPartitions { iter => val hashExpressions = new MutableProjection(expressions) @@ -46,8 +45,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una val shuffled = new ShuffledRDD[Row, Row, MutablePair[Row, Row]](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.map(_._2) - } - case RangePartitioning(sortingExpressions, numPartitions) => { + + case RangePartitioning(sortingExpressions, numPartitions) => // TODO: RangePartitioner should take an Ordering. implicit val ordering = new RowOrdering(sortingExpressions) @@ -60,9 +59,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.map(_._1) - } + case SinglePartition => - child.execute().coalesce(1, true) + child.execute().coalesce(1, shuffle = true) case _ => sys.error(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index c1da3653c5..7e50fda4ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql package execution -import catalyst.expressions._ -import catalyst.types._ +import org.apache.spark.sql.catalyst.expressions._ /** * Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 85035b8118..9eb1032113 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -20,13 +20,13 @@ package execution import org.apache.spark.SparkContext -import catalyst.expressions._ -import catalyst.planning._ -import catalyst.plans._ -import catalyst.plans.logical.LogicalPlan -import catalyst.plans.physical._ -import parquet.ParquetRelation -import parquet.InsertIntoParquetTable +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.parquet.ParquetRelation +import org.apache.spark.sql.parquet.InsertIntoParquetTable abstract class SparkStrategies extends QueryPlanner[SparkPlan] { @@ -172,7 +172,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { - // TOOD: Set + // TODO: Set val numPartitions = 200 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Distinct(child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala index 51889c1988..14e5ab628f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala @@ -19,12 +19,11 @@ package org.apache.spark.sql package execution import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.errors._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ -import catalyst.errors._ -import catalyst.expressions._ -import catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples} -import catalyst.types._ - +/* Implicit conversions */ import org.apache.spark.rdd.PartitionLocalRDDFunctions._ /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index c6d31d9abc..e4f918b678 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -23,18 +23,17 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext -import catalyst.errors._ -import catalyst.expressions._ -import catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution} -import catalyst.plans.logical.LogicalPlan -import catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.errors._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.ScalaReflection case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { def output = projectList.map(_.toAttribute) def execute() = child.execute().mapPartitions { iter => - @transient val resuableProjection = new MutableProjection(projectList) - iter.map(resuableProjection) + @transient val reusableProjection = new MutableProjection(projectList) + iter.map(reusableProjection) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 5934fd1b03..a6e3892e88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -23,10 +23,10 @@ import scala.collection.mutable import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext -import catalyst.errors._ -import catalyst.expressions._ -import catalyst.plans._ -import catalyst.plans.physical.{ClusteredDistribution, Partitioning} +import org.apache.spark.sql.catalyst.errors._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning} import org.apache.spark.rdd.PartitionLocalRDDFunctions._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala index 67f6f43f90..e4a2dec332 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala @@ -25,5 +25,4 @@ package org.apache.spark.sql * documented here in order to make it easier for others to understand the performance * characteristics of query plans that are generated by Spark SQL. */ -package object execution { -} +package object execution diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index e87561fe13..011aaf7440 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -19,28 +19,27 @@ package org.apache.spark.sql.parquet import java.io.{IOException, FileNotFoundException} -import org.apache.hadoop.fs.{Path, FileSystem} +import scala.collection.JavaConversions._ + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapreduce.Job -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, BaseRelation} -import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.catalyst.types.ArrayType -import org.apache.spark.sql.catalyst.expressions.{Row, AttributeReference, Attribute} -import org.apache.spark.sql.catalyst.analysis.UnresolvedException - -import parquet.schema.{MessageTypeParser, MessageType} +import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} +import parquet.hadoop.util.ContextUtil +import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} +import parquet.io.api.{Binary, RecordConsumer} import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName} +import parquet.schema.Type.Repetition +import parquet.schema.{MessageType, MessageTypeParser} import parquet.schema.{PrimitiveType => ParquetPrimitiveType} import parquet.schema.{Type => ParquetType} -import parquet.schema.Type.Repetition -import parquet.io.api.{Binary, RecordConsumer} -import parquet.hadoop.{Footer, ParquetFileWriter, ParquetFileReader} -import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} -import parquet.hadoop.util.ContextUtil -import scala.collection.JavaConversions._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} +import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.types._ /** * Relation that consists of data stored in a Parquet columnar format. @@ -55,7 +54,7 @@ import scala.collection.JavaConversions._ * @param tableName The name of the relation that can be used in queries. * @param path The path to the Parquet file. */ -case class ParquetRelation(val tableName: String, val path: String) extends BaseRelation { +case class ParquetRelation(tableName: String, path: String) extends BaseRelation { /** Schema derived from ParquetFile **/ def parquetSchema: MessageType = @@ -145,11 +144,10 @@ object ParquetTypesConverter { case ParquetPrimitiveTypeName.FLOAT => FloatType case ParquetPrimitiveTypeName.INT32 => IntegerType case ParquetPrimitiveTypeName.INT64 => LongType - case ParquetPrimitiveTypeName.INT96 => { + case ParquetPrimitiveTypeName.INT96 => // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? sys.error("Warning: potential loss of precision: converting INT96 to long") LongType - } case _ => sys.error( s"Unsupported parquet datatype $parquetType") } @@ -186,11 +184,10 @@ object ParquetTypesConverter { def convertToAttributes(parquetSchema: MessageType) : Seq[Attribute] = { parquetSchema.getColumns.map { - case (desc) => { + case (desc) => val ctype = toDataType(desc.getType) val name: String = desc.getPath.mkString(".") new AttributeReference(name, ctype, false)() - } } } @@ -245,7 +242,7 @@ object ParquetTypesConverter { * Try to read Parquet metadata at the given Path. We first see if there is a summary file * in the parent directory. If so, this is used. Else we read the actual footer at the given * location. - * @param path The path at which we expect one (or more) Parquet files. + * @param origPath The path at which we expect one (or more) Parquet files. * @return The `ParquetMetadata` containing among other things the schema. */ def readMetaData(origPath: Path): ParquetMetadata = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 61121103cb..7285f5b88b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -17,24 +17,24 @@ package org.apache.spark.sql.parquet -import parquet.io.InvalidRecordException -import parquet.schema.MessageType -import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat} -import parquet.hadoop.util.ContextUtil - -import org.apache.spark.rdd.RDD -import org.apache.spark.{TaskContext, SerializableWritable, SparkContext} -import org.apache.spark.sql.catalyst.expressions.{Row, Attribute, Expression} -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode, LeafNode} +import java.io.IOException +import java.text.SimpleDateFormat +import java.util.Date -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import java.io.IOException -import java.text.SimpleDateFormat -import java.util.Date +import parquet.hadoop.util.ContextUtil +import parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat} +import parquet.io.InvalidRecordException +import parquet.schema.MessageType + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} +import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} +import org.apache.spark.{SerializableWritable, SparkContext, TaskContext} /** * Parquet table scan operator. Imports the file that backs the given diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index c2ae18b882..91b4848fe4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -19,16 +19,15 @@ package org.apache.spark.sql.parquet import org.apache.hadoop.conf.Configuration -import org.apache.spark.Logging - -import parquet.io.api._ -import parquet.schema.{MessageTypeParser, MessageType} -import parquet.hadoop.api.{WriteSupport, ReadSupport} -import parquet.hadoop.api.ReadSupport.ReadContext -import parquet.hadoop.ParquetOutputFormat import parquet.column.ParquetProperties +import parquet.hadoop.ParquetOutputFormat +import parquet.hadoop.api.ReadSupport.ReadContext +import parquet.hadoop.api.{ReadSupport, WriteSupport} +import parquet.io.api._ +import parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} import org.apache.spark.sql.catalyst.types._ /** @@ -95,8 +94,7 @@ class RowWriteSupport extends WriteSupport[Row] with Logging { } def getSchema(configuration: Configuration): MessageType = { - return MessageTypeParser.parseMessageType( - configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA)) + MessageTypeParser.parseMessageType(configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA)) } private var schema: MessageType = null @@ -108,7 +106,7 @@ class RowWriteSupport extends WriteSupport[Row] with Logging { attributes = ParquetTypesConverter.convertToAttributes(schema) new WriteSupport.WriteContext( schema, - new java.util.HashMap[java.lang.String, java.lang.String]()); + new java.util.HashMap[java.lang.String, java.lang.String]()) } override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index bbe409fb9c..3340c3ff81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -17,17 +17,16 @@ package org.apache.spark.sql.parquet -import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job -import parquet.schema.{MessageTypeParser, MessageType} -import parquet.hadoop.util.ContextUtil import parquet.hadoop.ParquetWriter +import parquet.hadoop.util.ContextUtil +import parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.sql.catalyst.util.getTempFilePath import org.apache.spark.sql.catalyst.expressions.GenericRow -import java.nio.charset.Charset +import org.apache.spark.sql.catalyst.util.getTempFilePath object ParquetTestData { @@ -69,7 +68,7 @@ object ParquetTestData { lazy val testData = new ParquetRelation("testData", testFile.toURI.toString) - def writeFile = { + def writeFile() = { testFile.delete val path: Path = new Path(testFile.toURI) val job = new Job() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 37c90a18a0..2524a37cba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -17,14 +17,9 @@ package org.apache.spark.sql -import org.scalatest.{BeforeAndAfterAll, FunSuite} - import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test._ /* Implicits */ @@ -198,4 +193,4 @@ class DslQuerySuite extends QueryTest { (null, null, 5, "E") :: (null, null, 6, "F") :: Nil) } -} \ No newline at end of file +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlannerSuite.scala deleted file mode 100644 index 83908edf5a..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlannerSuite.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql -package execution - -import org.scalatest.FunSuite - -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.test.TestSQLContext._ -import org.apache.spark.sql.test.TestSQLContext.planner._ - -class PlannerSuite extends FunSuite { - - - test("unions are collapsed") { - val query = testData.unionAll(testData).unionAll(testData).logicalPlan - val planned = BasicOperators(query).head - val logicalUnions = query collect { case u: logical.Union => u} - val physicalUnions = planned collect { case u: execution.Union => u} - - assert(logicalUnions.size === 2) - assert(physicalUnions.size === 1) - } - - test("count is partially aggregated") { - val query = testData.groupBy('value)(Count('key)).analyze.logicalPlan - val planned = PartialAggregation(query).head - val aggregations = planned.collect { case a: Aggregate => a } - - assert(aggregations.size === 2) - } - - test("count distinct is not partially aggregated") { - val query = testData.groupBy('value)(CountDistinct('key :: Nil)).analyze.logicalPlan - val planned = PartialAggregation(query.logicalPlan) - assert(planned.isEmpty) - } - - test("mixed aggregates are not partially aggregated") { - val query = - testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).analyze.logicalPlan - val planned = PartialAggregation(query) - assert(planned.isEmpty) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index aa84211648..5c8cb086ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -17,18 +17,12 @@ package org.apache.spark.sql -import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.FunSuite -import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.test._ /* Implicits */ -import TestSQLContext._ class QueryTest extends FunSuite { /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6371fa296a..fa4a1d5189 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -17,13 +17,7 @@ package org.apache.spark.sql -import org.scalatest.{BeforeAndAfterAll, FunSuite} - -import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.test._ /* Implicits */ @@ -37,8 +31,7 @@ class SQLQuerySuite extends QueryTest { test("agg") { checkAnswer( sql("SELECT a, SUM(b) FROM testData2 GROUP BY a"), - Seq((1,3),(2,3),(3,3)) - ) + Seq((1,3),(2,3),(3,3))) } test("select *") { @@ -88,13 +81,11 @@ class SQLQuerySuite extends QueryTest { ignore("null count") { checkAnswer( sql("SELECT a, COUNT(b) FROM testData3"), - Seq((1,0), (2, 1)) - ) + Seq((1,0), (2, 1))) checkAnswer( testData3.groupBy()(Count('a), Count('b), Count(1), CountDistinct('a :: Nil), CountDistinct('b :: Nil)), - (2, 1, 2, 2, 1) :: Nil - ) + (2, 1, 2, 2, 1) :: Nil) } test("inner join where, one match per row") { @@ -104,8 +95,7 @@ class SQLQuerySuite extends QueryTest { (1, "A", 1, "a"), (2, "B", 2, "b"), (3, "C", 3, "c"), - (4, "D", 4, "d") - )) + (4, "D", 4, "d"))) } test("inner join ON, one match per row") { @@ -115,8 +105,7 @@ class SQLQuerySuite extends QueryTest { (1, "A", 1, "a"), (2, "B", 2, "b"), (3, "C", 3, "c"), - (4, "D", 4, "d") - )) + (4, "D", 4, "d"))) } test("inner join, where, multiple matches") { @@ -129,8 +118,7 @@ class SQLQuerySuite extends QueryTest { (1,1,1,1) :: (1,1,1,2) :: (1,2,1,1) :: - (1,2,1,2) :: Nil - ) + (1,2,1,2) :: Nil) } test("inner join, no matches") { @@ -164,7 +152,7 @@ class SQLQuerySuite extends QueryTest { row => Seq.fill(16)((row ++ row).toSeq)).collect().toSeq) } - ignore("cartisian product join") { + ignore("cartesian product join") { checkAnswer( testData3.join(testData3), (1, null, 1, null) :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 640292571b..0bb13cf442 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -37,16 +37,14 @@ object TestData { TestData2(2, 1) :: TestData2(2, 2) :: TestData2(3, 1) :: - TestData2(3, 2) :: Nil - ) + TestData2(3, 2) :: Nil) testData2.registerAsTable("testData2") // TODO: There is no way to express null primitives as case classes currently... val testData3 = logical.LocalRelation('a.int, 'b.int).loadData( (1, null) :: - (2, 2) :: Nil - ) + (2, 2) :: Nil) case class UpperCaseData(N: Int, L: String) val upperCaseData = @@ -56,8 +54,7 @@ object TestData { UpperCaseData(3, "C") :: UpperCaseData(4, "D") :: UpperCaseData(5, "E") :: - UpperCaseData(6, "F") :: Nil - ) + UpperCaseData(6, "F") :: Nil) upperCaseData.registerAsTable("upperCaseData") case class LowerCaseData(n: Int, l: String) @@ -66,7 +63,6 @@ object TestData { LowerCaseData(1, "a") :: LowerCaseData(2, "b") :: LowerCaseData(3, "c") :: - LowerCaseData(4, "d") :: Nil - ) + LowerCaseData(4, "d") :: Nil) lowerCaseData.registerAsTable("lowerCaseData") -} \ No newline at end of file +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TgfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TgfSuite.scala deleted file mode 100644 index 08265b7a6a..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/TgfSuite.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql -package execution - -import org.scalatest.{BeforeAndAfterAll, FunSuite} - -import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.test._ - - -import TestSQLContext._ - -/** - * This is an example TGF that uses UnresolvedAttributes 'name and 'age to access specific columns - * from the input data. These will be replaced during analysis with specific AttributeReferences - * and then bound to specific ordinals during query planning. While TGFs could also access specific - * columns using hand-coded ordinals, doing so violates data independence. - * - * Note: this is only a rough example of how TGFs can be expressed, the final version will likely - * involve a lot more sugar for cleaner use in Scala/Java/etc. - */ -case class ExampleTGF(input: Seq[Attribute] = Seq('name, 'age)) extends Generator { - def children = input - protected def makeOutput() = 'nameAndAge.string :: Nil - - val Seq(nameAttr, ageAttr) = input - - override def apply(input: Row): TraversableOnce[Row] = { - val name = nameAttr.apply(input) - val age = ageAttr.apply(input).asInstanceOf[Int] - - Iterator( - new GenericRow(Array[Any](s"$name is $age years old")), - new GenericRow(Array[Any](s"Next year, $name will be ${age + 1} years old"))) - } -} - -class TgfSuite extends QueryTest { - val inputData = - logical.LocalRelation('name.string, 'age.int).loadData( - ("michael", 29) :: Nil - ) - - test("simple tgf example") { - checkAnswer( - inputData.generate(ExampleTGF()), - Seq( - "michael is 29 years old" :: Nil, - "Next year, michael will be 30 years old" :: Nil)) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala new file mode 100644 index 0000000000..658ff0927a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.scalatest.FunSuite + +import org.apache.spark.sql.TestData._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.execution +import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.sql.test.TestSQLContext.planner._ + +class PlannerSuite extends FunSuite { + test("unions are collapsed") { + val query = testData.unionAll(testData).unionAll(testData).logicalPlan + val planned = BasicOperators(query).head + val logicalUnions = query collect { case u: logical.Union => u} + val physicalUnions = planned collect { case u: execution.Union => u} + + assert(logicalUnions.size === 2) + assert(physicalUnions.size === 1) + } + + test("count is partially aggregated") { + val query = testData.groupBy('value)(Count('key)).analyze.logicalPlan + val planned = PartialAggregation(query).head + val aggregations = planned.collect { case a: Aggregate => a } + + assert(aggregations.size === 2) + } + + test("count distinct is not partially aggregated") { + val query = testData.groupBy('value)(CountDistinct('key :: Nil)).analyze.logicalPlan + val planned = PartialAggregation(query.logicalPlan) + assert(planned.isEmpty) + } + + test("mixed aggregates are not partially aggregated") { + val query = + testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).analyze.logicalPlan + val planned = PartialAggregation(query) + assert(planned.isEmpty) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala new file mode 100644 index 0000000000..93b2a308a4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ + +/* Implicit conversions */ +import org.apache.spark.sql.test.TestSQLContext._ + +/** + * This is an example TGF that uses UnresolvedAttributes 'name and 'age to access specific columns + * from the input data. These will be replaced during analysis with specific AttributeReferences + * and then bound to specific ordinals during query planning. While TGFs could also access specific + * columns using hand-coded ordinals, doing so violates data independence. + * + * Note: this is only a rough example of how TGFs can be expressed, the final version will likely + * involve a lot more sugar for cleaner use in Scala/Java/etc. + */ +case class ExampleTGF(input: Seq[Attribute] = Seq('name, 'age)) extends Generator { + def children = input + protected def makeOutput() = 'nameAndAge.string :: Nil + + val Seq(nameAttr, ageAttr) = input + + override def apply(input: Row): TraversableOnce[Row] = { + val name = nameAttr.apply(input) + val age = ageAttr.apply(input).asInstanceOf[Int] + + Iterator( + new GenericRow(Array[Any](s"$name is $age years old")), + new GenericRow(Array[Any](s"Next year, $name will be ${age + 1} years old"))) + } +} + +class TgfSuite extends QueryTest { + val inputData = + logical.LocalRelation('name.string, 'age.int).loadData( + ("michael", 29) :: Nil + ) + + test("simple tgf example") { + checkAnswer( + inputData.generate(ExampleTGF()), + Seq( + "michael is 29 years old" :: Nil, + "Next year, michael will be 30 years old" :: Nil)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 8b2ccb52d8..71caa709af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -19,21 +19,20 @@ package org.apache.spark.sql.parquet import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapreduce.Job +import parquet.hadoop.ParquetFileWriter +import parquet.hadoop.util.ContextUtil +import parquet.schema.MessageTypeParser + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.util.getTempFilePath import org.apache.spark.sql.test.TestSQLContext -import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.fs.{Path, FileSystem} - -import parquet.schema.MessageTypeParser -import parquet.hadoop.ParquetFileWriter -import parquet.hadoop.util.ContextUtil - class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { override def beforeAll() { - ParquetTestData.writeFile + ParquetTestData.writeFile() } override def afterAll() { diff --git a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala index 08d390e887..0b38731919 100644 --- a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala @@ -22,15 +22,14 @@ import java.text.NumberFormat import java.util.Date import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} +import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} +import org.apache.hadoop.hive.ql.plan.FileSinkDesc import org.apache.hadoop.io.Writable import org.apache.spark.Logging import org.apache.spark.SerializableWritable -import org.apache.hadoop.hive.ql.exec.{Utilities, FileSinkOperator} -import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} -import org.apache.hadoop.hive.ql.plan.FileSinkDesc - /** * Internal helper class that saves an RDD using a Hive OutputFormat. * It is based on [[SparkHadoopWriter]]. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4aad876cc0..491b3a6271 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -18,25 +18,26 @@ package org.apache.spark.sql package hive -import java.io.{PrintStream, InputStreamReader, BufferedReader, File} -import java.util.{ArrayList => JArrayList} import scala.language.implicitConversions -import org.apache.spark.SparkContext +import java.io.{BufferedReader, File, InputStreamReader, PrintStream} +import java.util.{ArrayList => JArrayList} + import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.ql.processors.{CommandProcessorResponse, CommandProcessorFactory} -import org.apache.hadoop.hive.ql.processors.CommandProcessor import org.apache.hadoop.hive.ql.Driver -import org.apache.spark.rdd.RDD - -import catalyst.analysis.{Analyzer, OverrideCatalog} -import catalyst.expressions.GenericRow -import catalyst.plans.logical.{BaseRelation, LogicalPlan, NativeCommand, ExplainCommand} -import catalyst.types._ +import org.apache.hadoop.hive.ql.processors._ +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} +import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand} +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution._ +/* Implicit conversions */ import scala.collection.JavaConversions._ /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e4d50722ce..a5db283765 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.Deserializer -import catalyst.analysis.Catalog -import catalyst.expressions._ -import catalyst.plans.logical -import catalyst.plans.logical._ -import catalyst.rules._ -import catalyst.types._ +import org.apache.spark.sql.catalyst.analysis.Catalog +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.types._ import scala.collection.JavaConversions._ @@ -45,7 +45,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { db: Option[String], tableName: String, alias: Option[String]): LogicalPlan = { - val databaseName = db.getOrElse(hive.sessionState.getCurrentDatabase()) + val databaseName = db.getOrElse(hive.sessionState.getCurrentDatabase) val table = client.getTable(databaseName, tableName) val partitions: Seq[Partition] = if (table.isPartitioned) { @@ -91,7 +91,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { object CreateTables extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case InsertIntoCreatedTable(db, tableName, child) => - val databaseName = db.getOrElse(SessionState.get.getCurrentDatabase()) + val databaseName = db.getOrElse(SessionState.get.getCurrentDatabase) createTable(databaseName, tableName, child.output) @@ -123,8 +123,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { } else { // Only do the casting when child output data types differ from table output data types. val castedChildOutput = child.output.zip(table.output).map { - case (input, table) if input.dataType != table.dataType => - Alias(Cast(input, table.dataType), input.name)() + case (input, output) if input.dataType != output.dataType => + Alias(Cast(input, output.dataType), input.name)() case (input, _) => input } @@ -135,7 +135,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { /** * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. - * For now, if this functionallity is desired mix in the in-memory [[OverrideCatalog]]. + * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ override def registerTable( databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ??? diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 4f33a293c3..8e76a7348e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -18,18 +18,19 @@ package org.apache.spark.sql package hive -import scala.collection.JavaConversions._ - import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.parse._ import org.apache.hadoop.hive.ql.plan.PlanUtils -import catalyst.analysis._ -import catalyst.expressions._ -import catalyst.plans._ -import catalyst.plans.logical -import catalyst.plans.logical._ -import catalyst.types._ +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.types._ + +/* Implicit conversions */ +import scala.collection.JavaConversions._ /** * Used when we need to start parsing the AST before deciding that we are going to pass the command @@ -48,7 +49,7 @@ case class AddJar(jarPath: String) extends Command case class AddFile(filePath: String) extends Command -/** Provides a mapping from HiveQL statments to catalyst logical plans and expression trees. */ +/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ object HiveQl { protected val nativeCommands = Seq( "TOK_DESCFUNCTION", @@ -150,13 +151,13 @@ object HiveQl { } /** - * Returns a scala.Seq equivilent to [s] or Nil if [s] is null. + * Returns a scala.Seq equivalent to [s] or Nil if [s] is null. */ private def nilIfEmpty[A](s: java.util.List[A]): Seq[A] = Option(s).map(_.toSeq).getOrElse(Nil) /** - * Returns this ASTNode with the text changed to `newText``. + * Returns this ASTNode with the text changed to `newText`. */ def withText(newText: String): ASTNode = { n.token.asInstanceOf[org.antlr.runtime.CommonToken].setText(newText) @@ -667,7 +668,7 @@ object HiveQl { case Token(allJoinTokens(joinToken), relation1 :: relation2 :: other) => - assert(other.size <= 1, s"Unhandled join child ${other}") + assert(other.size <= 1, s"Unhandled join child $other") val joinType = joinToken match { case "TOK_JOIN" => Inner case "TOK_RIGHTOUTERJOIN" => RightOuter diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 92d84208ab..c71141c419 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,13 +18,12 @@ package org.apache.spark.sql package hive -import catalyst.expressions._ -import catalyst.planning._ -import catalyst.plans._ -import catalyst.plans.logical.{BaseRelation, LogicalPlan} - +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.parquet.{ParquetRelation, InsertIntoParquetTable, ParquetTableScan} +import org.apache.spark.sql.parquet.{InsertIntoParquetTable, ParquetRelation, ParquetTableScan} trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala index f20e9d4de4..dc4181ec99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql package hive -import java.io.{InputStreamReader, BufferedReader} +import java.io.{BufferedReader, InputStreamReader} -import catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ +/* Implicit conversions */ import scala.collection.JavaConversions._ /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 71d751cbc4..99dc85ec19 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -19,19 +19,18 @@ package org.apache.spark.sql package hive import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ +import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde2.Deserializer -import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.io.Writable -import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.hadoop.mapred.{FileInputFormat, JobConf, InputFormat} +import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} import org.apache.spark.SerializableWritable import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.{HadoopRDD, UnionRDD, EmptyRDD, RDD} - +import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} /** * A trait for subclasses that handle table scans. @@ -40,7 +39,6 @@ private[hive] sealed trait TableReader { def makeRDDForTable(hiveTable: HiveTable): RDD[_] def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] - } @@ -57,7 +55,6 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon private val _minSplitsPerRDD = math.max( sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinSplits) - // TODO: set aws s3 credentials. private val _broadcastedHiveConf = @@ -85,8 +82,8 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon def makeRDDForTable( hiveTable: HiveTable, deserializerClass: Class[_ <: Deserializer], - filterOpt: Option[PathFilter]): RDD[_] = - { + filterOpt: Option[PathFilter]): RDD[_] = { + assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table, since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""") @@ -115,6 +112,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon sys.error(s"Unable to deserialize non-Writable: $value of ${value.getClass.getName}") } } + deserializedHadoopRDD } @@ -136,8 +134,8 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon */ def makeRDDForPartitionedTable( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], - filterOpt: Option[PathFilter]): RDD[_] = - { + filterOpt: Option[PathFilter]): RDD[_] = { + val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = partition.getPartitionPath @@ -178,6 +176,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon } } }.toSeq + // Even if we don't use any partitions, we still need an empty RDD if (hivePartitionRDDs.size == 0) { new EmptyRDD[Object](sc.sparkContext) @@ -207,8 +206,8 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon private def createHadoopRdd( tableDesc: TableDesc, path: String, - inputFormatClass: Class[InputFormat[Writable, Writable]]) - : RDD[Writable] = { + inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { + val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( @@ -227,7 +226,6 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon } private[hive] object HadoopTableReader { - /** * Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to * instantiate a HadoopRDD. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 17ae4ef63c..a26b0ff231 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -22,23 +22,22 @@ import java.io.File import java.util.{Set => JavaSet} import scala.collection.mutable -import scala.collection.JavaConversions._ import scala.language.implicitConversions -import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} -import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.exec.FunctionRegistry -import org.apache.hadoop.hive.ql.io.avro.{AvroContainerOutputFormat, AvroContainerInputFormat} +import org.apache.hadoop.hive.ql.io.avro.{AvroContainerInputFormat, AvroContainerOutputFormat} import org.apache.hadoop.hive.ql.metadata.Table -import org.apache.hadoop.hive.serde2.avro.AvroSerDe -import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.hive.serde2.RegexSerDe +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.hive.serde2.avro.AvroSerDe -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand} +import org.apache.spark.sql.catalyst.util._ -import catalyst.analysis._ -import catalyst.plans.logical.{LogicalPlan, NativeCommand} -import catalyst.util._ +/* Implicit conversions */ +import scala.collection.JavaConversions._ object TestHive extends TestHiveContext(new SparkContext("local", "TestSQLContext", new SparkConf())) @@ -52,7 +51,7 @@ object TestHive * * TestHive is singleton object version of this class because instantiating multiple copies of the * hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of - * testcases that rely on TestHive must be serialized. + * test cases that rely on TestHive must be serialized. */ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { self => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index d20fd87f34..9aa9e173a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -24,24 +24,18 @@ import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive} import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector - -import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ -import catalyst.expressions._ -import catalyst.types.{BooleanType, DataType} -import org.apache.spark.{TaskContext, SparkException} -import catalyst.expressions.Cast import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} import org.apache.spark.sql.execution._ - -import scala.Some -import scala.collection.immutable.ListMap +import org.apache.spark.{TaskContext, SparkException} /* Implicits */ import scala.collection.JavaConversions._ @@ -194,20 +188,26 @@ case class InsertIntoHiveTable( * TODO: Consolidate all hive OI/data interface code. */ protected def wrap(a: (Any, ObjectInspector)): Any = a match { - case (s: String, oi: JavaHiveVarcharObjectInspector) => new HiveVarchar(s, s.size) + case (s: String, oi: JavaHiveVarcharObjectInspector) => + new HiveVarchar(s, s.size) + case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) => new HiveDecimal(bd.underlying()) + case (row: Row, oi: StandardStructObjectInspector) => val struct = oi.create() - row.zip(oi.getAllStructFieldRefs).foreach { + row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach { case (data, field) => oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector)) } struct + case (s: Seq[_], oi: ListObjectInspector) => val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector)) seqAsJavaList(wrappedSeq) - case (obj, _) => obj + + case (obj, _) => + obj } def saveAsHiveFile( @@ -324,7 +324,7 @@ case class InsertIntoHiveTable( case (key, Some(value)) => key -> value case (key, None) => key -> "" // Should not reach here right now. } - val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols(), partitionSpec) + val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) db.validatePartitionNameCharacters(partVals) // inheritTableSpecs is set to true. It should be set to false for a IMPORT query // which is currently considered as a Hive native command. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 5e775d6a04..72ccd4f4a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -18,22 +18,24 @@ package org.apache.spark.sql package hive -import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.hadoop.hive.serde2.{io => hiveIo} -import org.apache.hadoop.hive.serde2.objectinspector.primitive._ -import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic._ -import org.apache.hadoop.hive.ql.exec.UDF +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.primitive._ +import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} -import catalyst.analysis -import catalyst.expressions._ -import catalyst.types -import catalyst.types._ +import org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types +import org.apache.spark.sql.catalyst.types._ + +/* Implicit conversions */ +import scala.collection.JavaConversions._ object HiveFunctionRegistry extends analysis.FunctionRegistry with HiveFunctionFactory with HiveInspectors { @@ -148,7 +150,7 @@ abstract class HiveUdf } case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf { - import HiveFunctionRegistry._ + import org.apache.spark.sql.hive.HiveFunctionRegistry._ type UDFType = UDF @transient diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala index a12ab23946..02ee2a0ebc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala @@ -20,7 +20,6 @@ package sql package hive package execution - import org.scalatest.{FunSuite, BeforeAndAfterAll} class ConcurrentHiveSuite extends FunSuite with BeforeAndAfterAll { @@ -35,4 +34,4 @@ class ConcurrentHiveSuite extends FunSuite with BeforeAndAfterAll { } } } -} \ No newline at end of file +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 8a5b97b7a0..e8fcc27235 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -20,12 +20,11 @@ package hive package execution import java.io._ -import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} -import catalyst.plans.logical.{ExplainCommand, NativeCommand} -import catalyst.plans._ -import catalyst.util._ +import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} +import org.apache.spark.sql.catalyst.plans.logical.{ExplainCommand, NativeCommand} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.Sort /** @@ -38,7 +37,8 @@ import org.apache.spark.sql.execution.Sort * See the documentation of public vals in this class for information on how test execution can be * configured using system properties. */ -abstract class HiveComparisonTest extends FunSuite with BeforeAndAfterAll with GivenWhenThen with Logging { +abstract class HiveComparisonTest + extends FunSuite with BeforeAndAfterAll with GivenWhenThen with Logging { /** * When set, any cache files that result in test failures will be deleted. Used when the test @@ -376,4 +376,4 @@ abstract class HiveComparisonTest extends FunSuite with BeforeAndAfterAll with G } } } -} \ No newline at end of file +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index d010023f78..16bcded8a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -19,11 +19,6 @@ package org.apache.spark.sql package hive package execution - -import java.io._ - -import util._ - /** * Runs the test cases that are included in the hive distribution. */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala index f0a4ec3c02..2d2f13333a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql package hive package execution -import java.io._ +import java.io.File -import catalyst.util._ +import org.apache.spark.sql.catalyst.util._ /** * A framework for running the query tests that are listed as a set of text files. @@ -67,4 +67,4 @@ abstract class HiveQueryFileTest extends HiveComparisonTest { ignore(testCaseName) {} } } -} \ No newline at end of file +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 28a5d260b3..b804634db1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql package hive package execution - /** * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. */ @@ -141,4 +140,4 @@ class HiveQuerySuite extends HiveComparisonTest { sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s") } -} \ No newline at end of file +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 0dd79faa15..996bd4efec 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -23,8 +23,6 @@ package execution * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. */ class HiveResolutionSuite extends HiveComparisonTest { - import TestHive._ - createQueryTest("table.attr", "SELECT src.key FROM src ORDER BY key LIMIT 1") @@ -62,4 +60,4 @@ class HiveResolutionSuite extends HiveComparisonTest { createQueryTest("tableName.attr from aliased subquery", "SELECT src.key FROM (SELECT * FROM src ORDER BY key LIMIT 1) a") */ -} \ No newline at end of file +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 8542f42aa9..bb65c91e2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql package hive package execution -import scala.collection.JavaConversions._ - import org.apache.spark.sql.hive.TestHive +/* Implicit conversions */ +import scala.collection.JavaConversions._ + /** * A set of test cases that validate partition and column pruning. */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index ee90061c7c..05ad85b622 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -19,21 +19,23 @@ package org.apache.spark.sql.parquet import java.io.File -import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.util.getTempFilePath import org.apache.spark.sql.hive.TestHive - class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { - val filename = getTempFilePath("parquettest").getCanonicalFile.toURI.toString // runs a SQL and optionally resolves one Parquet table - def runQuery(querystr: String, tableName: Option[String] = None, filename: Option[String] = None): Array[Row] = { + def runQuery( + querystr: String, + tableName: Option[String] = None, + filename: Option[String] = None): Array[Row] = { + // call to resolve references in order to get CREATE TABLE AS to work val query = TestHive .parseSql(querystr) @@ -90,7 +92,7 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft override def beforeAll() { // write test data - ParquetTestData.writeFile + ParquetTestData.writeFile() // Override initial Parquet test table TestHive.catalog.registerTable(Some[String]("parquet"), "testsource", ParquetTestData.testData) } @@ -151,7 +153,7 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft (rddOne, rddTwo).zipped.foreach { (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach { case ((value_1:Array[Byte], value_2:Array[Byte]), index) => - assert(new String(value_1) === new String(value_2), s"table $tableName row ${counter} field ${fieldNames(index)} don't match") + assert(new String(value_1) === new String(value_2), s"table $tableName row $counter field ${fieldNames(index)} don't match") case ((value_1, value_2), index) => assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match") } -- cgit v1.2.3