aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrank Rosner <frank@fam-rosner.de>2015-10-26 15:46:59 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-10-26 15:46:59 -0700
commitb60aab8a95e2a35a1d4023a9d0a0d9724e4164f9 (patch)
treea59513a3fd0c8b7a09a54ffe9fe273aa0fc26421
parent3689beb98b6a6db61e35049fdb57b0cd6aad8019 (diff)
downloadspark-b60aab8a95e2a35a1d4023a9d0a0d9724e4164f9.tar.gz
spark-b60aab8a95e2a35a1d4023a9d0a0d9724e4164f9.tar.bz2
spark-b60aab8a95e2a35a1d4023a9d0a0d9724e4164f9.zip
[SPARK-11258] Converting a Spark DataFrame into an R data.frame is slow / requires a lot of memory
https://issues.apache.org/jira/browse/SPARK-11258 I was not able to locate an existing unit test for this function so I wrote one. Author: Frank Rosner <frank@fam-rosner.de> Closes #9222 from FRosner/master.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala38
2 files changed, 47 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index b0120a8d0d..b3f134614c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -130,16 +130,18 @@ private[r] object SQLUtils {
}
def dfToCols(df: DataFrame): Array[Array[Any]] = {
- // localDF is Array[Row]
- val localDF = df.collect()
+ val localDF: Array[Row] = df.collect()
val numCols = df.columns.length
+ val numRows = localDF.length
- // result is Array[Array[Any]]
- (0 until numCols).map { colIdx =>
- localDF.map { row =>
- row(colIdx)
+ val colArray = new Array[Array[Any]](numCols)
+ for (colNo <- 0 until numCols) {
+ colArray(colNo) = new Array[Any](numRows)
+ for (rowNo <- 0 until numRows) {
+ colArray(colNo)(rowNo) = localDF(rowNo)(colNo)
}
- }.toArray
+ }
+ colArray
}
def saveMode(mode: String): SaveMode = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala
new file mode 100644
index 0000000000..f54e23e3aa
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala
@@ -0,0 +1,38 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.api.r
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class SQLUtilsSuite extends SharedSQLContext {
+
+ import testImplicits._
+
+ test("dfToCols should collect and transpose a data frame") {
+ val df = Seq(
+ (1, 2, 3),
+ (4, 5, 6)
+ ).toDF
+ assert(SQLUtils.dfToCols(df) === Array(
+ Array(1, 4),
+ Array(2, 5),
+ Array(3, 6)
+ ))
+ }
+
+}