diff options
author | Kousuke Saruta <sarutak@oss.nttdata.co.jp> | 2015-07-08 18:09:39 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-08 18:09:39 -0700 |
commit | 2a4f88b6c16f2991e63b17c0e103bcd79f04dbbc (patch) | |
tree | 228490ad19e816e5cb25b8c7df1879db0648bcdd | |
parent | f472b8cdc00839780dc79be0bbe53a098cde230c (diff) | |
download | spark-2a4f88b6c16f2991e63b17c0e103bcd79f04dbbc.tar.gz spark-2a4f88b6c16f2991e63b17c0e103bcd79f04dbbc.tar.bz2 spark-2a4f88b6c16f2991e63b17c0e103bcd79f04dbbc.zip |
[SPARK-8914][SQL] Remove RDDApi
As rxin suggested in #7298 , we should consider to remove `RDDApi`.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes #7302 from sarutak/remove-rddapi and squashes the following commits:
e495d35 [Kousuke Saruta] Fixed mima
cb7ebb9 [Kousuke Saruta] Removed overriding RDDApi
-rw-r--r-- | project/MimaExcludes.scala | 5 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 39 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala | 67 |
3 files changed, 24 insertions, 87 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 7346d80463..57a86bf8de 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -70,7 +70,12 @@ object MimaExcludes { "org.apache.spark.mllib.linalg.Matrix.numNonzeros"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.mllib.linalg.Matrix.numActives") + ) ++ Seq( + // SPARK-8914 Remove RDDApi + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.RDDApi") ) + case v if v.startsWith("1.4") => Seq( MimaBuild.excludeSparkPackage("deploy"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f33e19a0cb..eeefc85255 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -115,8 +115,7 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, - @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) - extends RDDApi[Row] with Serializable { + @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends Serializable { /** * A constructor that automatically analyzes the logical plan. @@ -1320,14 +1319,14 @@ class DataFrame private[sql]( * @group action * @since 1.3.0 */ - override def first(): Row = head() + def first(): Row = head() /** * Returns a new RDD by applying a function to all rows of this DataFrame. * @group rdd * @since 1.3.0 */ - override def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f) + def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f) /** * Returns a new RDD by first applying a function to all rows of this [[DataFrame]], @@ -1335,14 +1334,14 @@ class DataFrame private[sql]( * @group rdd * @since 1.3.0 */ - override def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f) + def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f) /** * Returns a new RDD by applying a function to each partition of this DataFrame. * @group rdd * @since 1.3.0 */ - override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = { + def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = { rdd.mapPartitions(f) } @@ -1351,49 +1350,49 @@ class DataFrame private[sql]( * @group rdd * @since 1.3.0 */ - override def foreach(f: Row => Unit): Unit = rdd.foreach(f) + def foreach(f: Row => Unit): Unit = rdd.foreach(f) /** * Applies a function f to each partition of this [[DataFrame]]. * @group rdd * @since 1.3.0 */ - override def foreachPartition(f: Iterator[Row] => Unit): Unit = rdd.foreachPartition(f) + def foreachPartition(f: Iterator[Row] => Unit): Unit = rdd.foreachPartition(f) /** * Returns the first `n` rows in the [[DataFrame]]. * @group action * @since 1.3.0 */ - override def take(n: Int): Array[Row] = head(n) + def take(n: Int): Array[Row] = head(n) /** * Returns an array that contains all of [[Row]]s in this [[DataFrame]]. * @group action * @since 1.3.0 */ - override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() + def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() /** * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]]. * @group action * @since 1.3.0 */ - override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() : _*) + def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() : _*) /** * Returns the number of rows in the [[DataFrame]]. * @group action * @since 1.3.0 */ - override def count(): Long = groupBy().count().collect().head.getLong(0) + def count(): Long = groupBy().count().collect().head.getLong(0) /** * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. * @group rdd * @since 1.3.0 */ - override def repartition(numPartitions: Int): DataFrame = { + def repartition(numPartitions: Int): DataFrame = { Repartition(numPartitions, shuffle = true, logicalPlan) } @@ -1405,7 +1404,7 @@ class DataFrame private[sql]( * @group rdd * @since 1.4.0 */ - override def coalesce(numPartitions: Int): DataFrame = { + def coalesce(numPartitions: Int): DataFrame = { Repartition(numPartitions, shuffle = false, logicalPlan) } @@ -1415,13 +1414,13 @@ class DataFrame private[sql]( * @group dfops * @since 1.3.0 */ - override def distinct(): DataFrame = dropDuplicates() + def distinct(): DataFrame = dropDuplicates() /** * @group basic * @since 1.3.0 */ - override def persist(): this.type = { + def persist(): this.type = { sqlContext.cacheManager.cacheQuery(this) this } @@ -1430,13 +1429,13 @@ class DataFrame private[sql]( * @group basic * @since 1.3.0 */ - override def cache(): this.type = persist() + def cache(): this.type = persist() /** * @group basic * @since 1.3.0 */ - override def persist(newLevel: StorageLevel): this.type = { + def persist(newLevel: StorageLevel): this.type = { sqlContext.cacheManager.cacheQuery(this, None, newLevel) this } @@ -1445,7 +1444,7 @@ class DataFrame private[sql]( * @group basic * @since 1.3.0 */ - override def unpersist(blocking: Boolean): this.type = { + def unpersist(blocking: Boolean): this.type = { sqlContext.cacheManager.tryUncacheQuery(this, blocking) this } @@ -1454,7 +1453,7 @@ class DataFrame private[sql]( * @group basic * @since 1.3.0 */ - override def unpersist(): this.type = unpersist(blocking = false) + def unpersist(): this.type = unpersist(blocking = false) ///////////////////////////////////////////////////////////////////////////// // I/O diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala deleted file mode 100644 index 63dbab1994..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql - -import scala.reflect.ClassTag - -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel - - -/** - * An internal interface defining the RDD-like methods for [[DataFrame]]. - * Please use [[DataFrame]] directly, and do NOT use this. - */ -private[sql] trait RDDApi[T] { - - def cache(): this.type - - def persist(): this.type - - def persist(newLevel: StorageLevel): this.type - - def unpersist(): this.type - - def unpersist(blocking: Boolean): this.type - - def map[R: ClassTag](f: T => R): RDD[R] - - def flatMap[R: ClassTag](f: T => TraversableOnce[R]): RDD[R] - - def mapPartitions[R: ClassTag](f: Iterator[T] => Iterator[R]): RDD[R] - - def foreach(f: T => Unit): Unit - - def foreachPartition(f: Iterator[T] => Unit): Unit - - def take(n: Int): Array[T] - - def collect(): Array[T] - - def collectAsList(): java.util.List[T] - - def count(): Long - - def first(): T - - def repartition(numPartitions: Int): DataFrame - - def coalesce(numPartitions: Int): DataFrame - - def distinct: DataFrame -} |