diff options
author | Davies Liu <davies.liu@gmail.com> | 2016-02-25 11:53:48 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-02-25 11:53:48 -0800 |
commit | 751724b1320d38fd94186df3d8f1ca887f21947a (patch) | |
tree | ef365e952284a7ec26aee882caa429a729223c9d /sql/core/src/main/scala/org/apache | |
parent | 46f6e79316b72afea0c9b1559ea662dd3e95e57b (diff) | |
download | spark-751724b1320d38fd94186df3d8f1ca887f21947a.tar.gz spark-751724b1320d38fd94186df3d8f1ca887f21947a.tar.bz2 spark-751724b1320d38fd94186df3d8f1ca887f21947a.zip |
Revert "[SPARK-13457][SQL] Removes DataFrame RDD operations"
This reverts commit 157fe64f3ecbd13b7286560286e50235eecfe30e.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
4 files changed, 44 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index be902d688e..abb8fe552b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1427,6 +1427,48 @@ class DataFrame private[sql]( def transform[U](t: DataFrame => DataFrame): DataFrame = t(this) /** + * Returns a new RDD by applying a function to all rows of this DataFrame. + * @group rdd + * @since 1.3.0 + */ + def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f) + + /** + * Returns a new RDD by first applying a function to all rows of this [[DataFrame]], + * and then flattening the results. + * @group rdd + * @since 1.3.0 + */ + def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f) + + /** + * Returns a new RDD by applying a function to each partition of this DataFrame. + * @group rdd + * @since 1.3.0 + */ + def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = { + rdd.mapPartitions(f) + } + + /** + * Applies a function `f` to all rows. + * @group rdd + * @since 1.3.0 + */ + def foreach(f: Row => Unit): Unit = withNewExecutionId { + rdd.foreach(f) + } + + /** + * Applies a function f to each partition of this [[DataFrame]]. + * @group rdd + * @since 1.3.0 + */ + def foreachPartition(f: Iterator[Row] => Unit): Unit = withNewExecutionId { + rdd.foreachPartition(f) + } + + /** * Returns the first `n` rows in the [[DataFrame]]. * * Running take requires moving data into the application's driver process, and doing so with diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index a7258d742a..f06d16116e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -306,7 +306,6 @@ class GroupedData protected[sql]( val values = df.select(pivotColumn) .distinct() .sort(pivotColumn) // ensure that the output columns are in a consistent logical order - .rdd .map(_.get(0)) .take(maxValues + 1) .toSeq diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 68a251757c..d912aeb70d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -100,7 +100,7 @@ private[r] object SQLUtils { } def dfToRowRDD(df: DataFrame): JavaRDD[Array[Byte]] = { - df.rdd.map(r => rowToRBytes(r)) + df.map(r => rowToRBytes(r)) } private[this] def doConversion(data: Object, dataType: DataType): Object = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index d7111a6a1c..e295722cac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -276,7 +276,7 @@ object JdbcUtils extends Logging { val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(url, properties) val batchSize = properties.getProperty("batchsize", "1000").toInt - df.rdd.foreachPartition { iterator => + df.foreachPartition { iterator => savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect) } } |