aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2016-02-25 11:53:48 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-25 11:53:48 -0800
commit751724b1320d38fd94186df3d8f1ca887f21947a (patch)
treeef365e952284a7ec26aee882caa429a729223c9d /sql/core/src/main/scala/org/apache
parent46f6e79316b72afea0c9b1559ea662dd3e95e57b (diff)
downloadspark-751724b1320d38fd94186df3d8f1ca887f21947a.tar.gz
spark-751724b1320d38fd94186df3d8f1ca887f21947a.tar.bz2
spark-751724b1320d38fd94186df3d8f1ca887f21947a.zip
Revert "[SPARK-13457][SQL] Removes DataFrame RDD operations"
This reverts commit 157fe64f3ecbd13b7286560286e50235eecfe30e.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala2
4 files changed, 44 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index be902d688e..abb8fe552b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1427,6 +1427,48 @@ class DataFrame private[sql](
def transform[U](t: DataFrame => DataFrame): DataFrame = t(this)
/**
+ * Returns a new RDD by applying a function to all rows of this DataFrame.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
+
+ /**
+ * Returns a new RDD by first applying a function to all rows of this [[DataFrame]],
+ * and then flattening the results.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f)
+
+ /**
+ * Returns a new RDD by applying a function to each partition of this DataFrame.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
+ rdd.mapPartitions(f)
+ }
+
+ /**
+ * Applies a function `f` to all rows.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def foreach(f: Row => Unit): Unit = withNewExecutionId {
+ rdd.foreach(f)
+ }
+
+ /**
+ * Applies a function f to each partition of this [[DataFrame]].
+ * @group rdd
+ * @since 1.3.0
+ */
+ def foreachPartition(f: Iterator[Row] => Unit): Unit = withNewExecutionId {
+ rdd.foreachPartition(f)
+ }
+
+ /**
* Returns the first `n` rows in the [[DataFrame]].
*
* Running take requires moving data into the application's driver process, and doing so with
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index a7258d742a..f06d16116e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -306,7 +306,6 @@ class GroupedData protected[sql](
val values = df.select(pivotColumn)
.distinct()
.sort(pivotColumn) // ensure that the output columns are in a consistent logical order
- .rdd
.map(_.get(0))
.take(maxValues + 1)
.toSeq
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 68a251757c..d912aeb70d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -100,7 +100,7 @@ private[r] object SQLUtils {
}
def dfToRowRDD(df: DataFrame): JavaRDD[Array[Byte]] = {
- df.rdd.map(r => rowToRBytes(r))
+ df.map(r => rowToRBytes(r))
}
private[this] def doConversion(data: Object, dataType: DataType): Object = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index d7111a6a1c..e295722cac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -276,7 +276,7 @@ object JdbcUtils extends Logging {
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(url, properties)
val batchSize = properties.getProperty("batchsize", "1000").toInt
- df.rdd.foreachPartition { iterator =>
+ df.foreachPartition { iterator =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
}
}