From 4dd24811d9035c52c5965fca2fc6431aac6963fc Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 2 Mar 2016 15:26:34 -0800 Subject: [SPARK-13594][SQL] remove typed operations(e.g. map, flatMap) from python DataFrame ## What changes were proposed in this pull request? Remove `map`, `flatMap`, `mapPartitions` from python DataFrame, to prepare for Dataset API in the future. ## How was this patch tested? existing tests Author: Wenchen Fan Closes #11445 from cloud-fan/python-clean. --- python/pyspark/mllib/linalg/distributed.py | 6 +++--- python/pyspark/mllib/tests.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'python/pyspark/mllib') diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index e1f022187d..43cb0beef1 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -256,7 +256,7 @@ class IndexedRowMatrix(DistributedMatrix): # on the Scala/Java side. Then we map each Row in the # DataFrame back to an IndexedRow on this side. rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model) - rows = rows_df.map(lambda row: IndexedRow(row[0], row[1])) + rows = rows_df.rdd.map(lambda row: IndexedRow(row[0], row[1])) return rows def numRows(self): @@ -475,7 +475,7 @@ class CoordinateMatrix(DistributedMatrix): # DataFrame on the Scala/Java side. Then we map each Row in # the DataFrame back to a MatrixEntry on this side. entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model) - entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2])) + entries = entries_df.rdd.map(lambda row: MatrixEntry(row[0], row[1], row[2])) return entries def numRows(self): @@ -700,7 +700,7 @@ class BlockMatrix(DistributedMatrix): # DataFrame on the Scala/Java side. Then we map each Row in # the DataFrame back to a sub-matrix block on this side. blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model) - blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1])) + blocks = blocks_df.rdd.map(lambda row: ((row[0][0], row[0][1]), row[1])) return blocks @property diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 25a7c29982..5f515b666c 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -697,7 +697,7 @@ class VectorUDTTests(MLlibTestCase): schema = df.schema field = [f for f in schema.fields if f.name == "features"][0] self.assertEqual(field.dataType, self.udt) - vectors = df.map(lambda p: p.features).collect() + vectors = df.rdd.map(lambda p: p.features).collect() self.assertEqual(len(vectors), 2) for v in vectors: if isinstance(v, SparseVector): @@ -729,7 +729,7 @@ class MatrixUDTTests(MLlibTestCase): df = rdd.toDF() schema = df.schema self.assertTrue(schema.fields[1].dataType, self.udt) - matrices = df.map(lambda x: x._2).collect() + matrices = df.rdd.map(lambda x: x._2).collect() self.assertEqual(len(matrices), 2) for m in matrices: if isinstance(m, DenseMatrix): -- cgit v1.2.3