diff options
author | Michael Armbrust <michael@databricks.com> | 2014-04-22 20:02:33 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-22 20:02:33 -0700 |
commit | aa77f8a6a604efe0d02bc8412b3f1ba3903b7a57 (patch) | |
tree | c1d0d64b74f80861acc13349ce5c1af2fa1158d4 /sql/core | |
parent | 662c860ebcec5565493a7dff4812e6b7a37b1d7d (diff) | |
download | spark-aa77f8a6a604efe0d02bc8412b3f1ba3903b7a57.tar.gz spark-aa77f8a6a604efe0d02bc8412b3f1ba3903b7a57.tar.bz2 spark-aa77f8a6a604efe0d02bc8412b3f1ba3903b7a57.zip |
SPARK-1562 Fix visibility / annotation of Spark SQL APIs
Author: Michael Armbrust <michael@databricks.com>
Closes #489 from marmbrus/sqlDocFixes and squashes the following commits:
acee4f3 [Michael Armbrust] Fix visibility / annotation of Spark SQL APIs
Diffstat (limited to 'sql/core')
12 files changed, 83 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index a390ab6005..3a895e15a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ /** * Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java) */ -trait SchemaRDDLike { +private[sql] trait SchemaRDDLike { @transient val sqlContext: SQLContext @transient protected[spark] val logicalPlan: LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index 3a4f071eeb..36b3b956da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -19,12 +19,14 @@ package org.apache.spark.sql.execution import java.util.HashMap +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ /** + * :: DeveloperApi :: * Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each * group. * @@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ * @param aggregateExpressions expressions that are computed for each group. * @param child the input data source. */ +@DeveloperApi case class Aggregate( partial: Boolean, groupingExpressions: Seq[Expression], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 070557e47c..3b4acb72e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf} import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.sql.Row @@ -26,6 +27,10 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { override def outputPartitioning = newPartitioning @@ -81,7 +86,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una * [[catalyst.plans.physical.Distribution Distribution]] requirements for each operator by inserting * [[Exchange]] Operators where required. */ -object AddExchange extends Rule[SparkPlan] { +private[sql] object AddExchange extends Rule[SparkPlan] { // TODO: Determine the number of partitions. val numPartitions = 150 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index cff4887936..da1e08be59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal, Projection} /** + * :: DeveloperApi :: * Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional * programming with one important additional feature, which allows the input rows to be joined with @@ -29,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal, * @param outer when true, each input row will be output at least once, even if the output of the * given `generator` is empty. `outer` has no effect when `join` is false. */ +@DeveloperApi case class Generate( generator: Generator, join: Boolean, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 5d89697db5..50124dd407 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Logging, Row} import org.apache.spark.sql.catalyst.trees @@ -26,6 +27,10 @@ import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.InMemoryColumnarTableScan +/** + * :: DeveloperApi :: + */ +@DeveloperApi abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { self: Product => @@ -51,6 +56,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { } /** + * :: DeveloperApi :: * Allows already planned SparkQueries to be linked into logical query plans. * * Note that in general it is not valid to use this class to link multiple copies of the same @@ -59,6 +65,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { * replace the output attributes with new copies of themselves without breaking any attribute * linking. */ +@DeveloperApi case class SparkLogicalPlan(alreadyPlanned: SparkPlan) extends logical.LogicalPlan with MultiInstanceRelation { @@ -77,15 +84,15 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) } } -trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] { +private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] { self: Product => } -trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] { +private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] { self: Product => override def outputPartitioning: Partitioning = child.outputPartitioning } -trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] { +private[sql] trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] { self: Product => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index c30ae5bcc0..5067c14ddf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -27,7 +27,7 @@ import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.MutablePair import org.apache.spark.util.Utils -class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { +private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { override def newKryo(): Kryo = { val kryo = new Kryo() kryo.setRegistrationRequired(false) @@ -50,7 +50,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { } } -object SparkSqlSerializer { +private[sql] object SparkSqlSerializer { // TODO (lian) Using KryoSerializer here is workaround, needs further investigation // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization // related error. @@ -68,7 +68,7 @@ object SparkSqlSerializer { } } -class BigDecimalSerializer extends Serializer[BigDecimal] { +private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] { def write(kryo: Kryo, output: Output, bd: math.BigDecimal) { // TODO: There are probably more efficient representations than strings... output.writeString(bd.toString()) @@ -83,7 +83,7 @@ class BigDecimalSerializer extends Serializer[BigDecimal] { * Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize * them as `Array[(k,v)]`. */ -class MapSerializer extends Serializer[Map[_,_]] { +private[sql] class MapSerializer extends Serializer[Map[_,_]] { def write(kryo: Kryo, output: Output, map: Map[_,_]) { kryo.writeObject(output, map.flatMap(e => Seq(e._1, e._2)).toArray) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index fe8bd5a508..500fde1971 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.parquet._ -abstract class SparkStrategies extends QueryPlanner[SparkPlan] { +private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => object HashJoin extends Strategy { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index eedcc7dda0..e4cf2020a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import scala.reflect.runtime.universe.TypeTag +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.sql.catalyst.ScalaReflection @@ -27,6 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.util.MutablePair +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { override def output = projectList.map(_.toAttribute) @@ -36,6 +41,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { override def output = child.output @@ -44,6 +53,10 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: SparkPlan) extends UnaryNode { @@ -53,6 +66,10 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: override def execute() = child.execute().sample(withReplacement, fraction, seed) } +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan { // TODO: attributes output by union should be distinct for nullability purposes override def output = children.head.output @@ -62,12 +79,14 @@ case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends } /** + * :: DeveloperApi :: * Take the first limit elements. Note that the implementation is different depending on whether * this is a terminal operator or not. If it is terminal and is invoked using executeCollect, * this operator uses Spark's take method on the Spark driver. If it is not terminal or is * invoked using execute, we first take the limit on each partition, and then repartition all the * data to a single partition to compute the global limit. */ +@DeveloperApi case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode { // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: // partition local limit -> exchange into one partition -> partition local limit again @@ -91,10 +110,12 @@ case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) exte } /** + * :: DeveloperApi :: * Take the first limit elements as defined by the sortOrder. This is logically equivalent to * having a [[Limit]] operator after a [[Sort]] operator. This could have been named TopK, but * Spark's top operator does the opposite in ordering so we name it TakeOrdered to avoid confusion. */ +@DeveloperApi case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) (@transient sc: SparkContext) extends UnaryNode { override def otherCopyArgs = sc :: Nil @@ -111,7 +132,10 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) override def execute() = sc.makeRDD(executeCollect(), 1) } - +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class Sort( sortOrder: Seq[SortOrder], global: Boolean, @@ -134,6 +158,10 @@ case class Sort( override def output = child.output } +/** + * :: DeveloperApi :: + */ +@DeveloperApi object ExistingRdd { def convertToCatalyst(a: Any): Any = a match { case s: Seq[Any] => s.map(convertToCatalyst) @@ -167,6 +195,10 @@ object ExistingRdd { } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala index 40982f1fff..a0d29100f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -object DebugQuery { +private[sql] object DebugQuery { def apply(plan: SparkPlan): SparkPlan = { val visited = new collection.mutable.HashSet[Long]() plan transform { @@ -28,7 +28,7 @@ object DebugQuery { } } -case class DebugNode(child: SparkPlan) extends UnaryNode { +private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode { def references = Set.empty def output = child.output def execute() = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index c89dae9358..31cc26962a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -21,14 +21,24 @@ import scala.collection.mutable.{ArrayBuffer, BitSet} import org.apache.spark.SparkContext +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning} +@DeveloperApi sealed abstract class BuildSide + +@DeveloperApi case object BuildLeft extends BuildSide + +@DeveloperApi case object BuildRight extends BuildSide +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class HashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], @@ -130,6 +140,10 @@ case class HashJoin( } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { def output = left.output ++ right.output @@ -138,6 +152,10 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class BroadcastNestedLoopJoin( streamed: SparkPlan, broadcast: SparkPlan, joinType: JoinType, condition: Option[Expression]) (@transient sc: SparkContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala index e4a2dec332..66237f8f13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql /** + * :: DeveloperApi :: * An execution engine for relational query plans that runs on top Spark and returns RDDs. * * Note that the operators in this package are created automatically by a query planner using a diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index 728e3dd1dc..f37976f731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -28,7 +28,7 @@ import parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.util.Utils -object ParquetTestData { +private[sql] object ParquetTestData { val testSchema = """message myrecord { |