diff options
author | Davies Liu <davies@databricks.com> | 2015-05-19 14:23:28 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-05-19 14:23:28 -0700 |
commit | 4de74d2602f6577c3c8458aa85377e89c19724ca (patch) | |
tree | f265842faf3e23527646f92b5efb5787042c79da /python/pyspark/sql/tests.py | |
parent | c12dff9b82e4869f866a9b96ce0bf05503dd7dda (diff) | |
download | spark-4de74d2602f6577c3c8458aa85377e89c19724ca.tar.gz spark-4de74d2602f6577c3c8458aa85377e89c19724ca.tar.bz2 spark-4de74d2602f6577c3c8458aa85377e89c19724ca.zip |
[SPARK-7738] [SQL] [PySpark] add reader and writer API in Python
cc rxin, please take a quick look, I'm working on tests.
Author: Davies Liu <davies@databricks.com>
Closes #6238 from davies/readwrite and squashes the following commits:
c7200eb [Davies Liu] update tests
9cbf01b [Davies Liu] Merge branch 'master' of github.com:apache/spark into readwrite
f0c5a04 [Davies Liu] use sqlContext.read.load
5f68bc8 [Davies Liu] update tests
6437e9a [Davies Liu] Merge branch 'master' of github.com:apache/spark into readwrite
bcc6668 [Davies Liu] add reader amd writer API in Python
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r-- | python/pyspark/sql/tests.py | 77 |
1 files changed, 35 insertions, 42 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 84ae36f2fd..7e34996241 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -485,29 +485,29 @@ class SQLTests(ReusedPySparkTestCase): df = self.df tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) - df.save(tmpPath, "org.apache.spark.sql.json", "error") - actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json") - self.assertTrue(sorted(df.collect()) == sorted(actual.collect())) + df.write.json(tmpPath) + actual = self.sqlCtx.read.json(tmpPath) + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) schema = StructType([StructField("value", StringType(), True)]) - actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json", schema) - self.assertTrue(sorted(df.select("value").collect()) == sorted(actual.collect())) + actual = self.sqlCtx.read.json(tmpPath, schema) + self.assertEqual(sorted(df.select("value").collect()), sorted(actual.collect())) - df.save(tmpPath, "org.apache.spark.sql.json", "overwrite") - actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json") - self.assertTrue(sorted(df.collect()) == sorted(actual.collect())) + df.write.json(tmpPath, "overwrite") + actual = self.sqlCtx.read.json(tmpPath) + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) - df.save(source="org.apache.spark.sql.json", mode="overwrite", path=tmpPath, - noUse="this options will not be used in save.") - actual = self.sqlCtx.load(source="org.apache.spark.sql.json", path=tmpPath, - noUse="this options will not be used in load.") - self.assertTrue(sorted(df.collect()) == sorted(actual.collect())) + df.write.save(format="json", mode="overwrite", path=tmpPath, + noUse="this options will not be used in save.") + actual = self.sqlCtx.read.load(format="json", path=tmpPath, + noUse="this options will not be used in load.") + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default", "org.apache.spark.sql.parquet") self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json") actual = self.sqlCtx.load(path=tmpPath) - self.assertTrue(sorted(df.collect()) == sorted(actual.collect())) + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName) shutil.rmtree(tmpPath) @@ -767,51 +767,44 @@ class HiveContextSQLTests(ReusedPySparkTestCase): df = self.df tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) - df.saveAsTable("savedJsonTable", "org.apache.spark.sql.json", "append", path=tmpPath) - actual = self.sqlCtx.createExternalTable("externalJsonTable", tmpPath, - "org.apache.spark.sql.json") - self.assertTrue( - sorted(df.collect()) == - sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect())) - self.assertTrue( - sorted(df.collect()) == - sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect())) - self.assertTrue(sorted(df.collect()) == sorted(actual.collect())) + df.write.saveAsTable("savedJsonTable", "json", "append", path=tmpPath) + actual = self.sqlCtx.createExternalTable("externalJsonTable", tmpPath, "json") + self.assertEqual(sorted(df.collect()), + sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect())) + self.assertEqual(sorted(df.collect()), + sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect())) + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) self.sqlCtx.sql("DROP TABLE externalJsonTable") - df.saveAsTable("savedJsonTable", "org.apache.spark.sql.json", "overwrite", path=tmpPath) + df.write.saveAsTable("savedJsonTable", "json", "overwrite", path=tmpPath) schema = StructType([StructField("value", StringType(), True)]) - actual = self.sqlCtx.createExternalTable("externalJsonTable", - source="org.apache.spark.sql.json", + actual = self.sqlCtx.createExternalTable("externalJsonTable", source="json", schema=schema, path=tmpPath, noUse="this options will not be used") - self.assertTrue( - sorted(df.collect()) == - sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect())) - self.assertTrue( - sorted(df.select("value").collect()) == - sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect())) - self.assertTrue(sorted(df.select("value").collect()) == sorted(actual.collect())) + self.assertEqual(sorted(df.collect()), + sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect())) + self.assertEqual(sorted(df.select("value").collect()), + sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect())) + self.assertEqual(sorted(df.select("value").collect()), sorted(actual.collect())) self.sqlCtx.sql("DROP TABLE savedJsonTable") self.sqlCtx.sql("DROP TABLE externalJsonTable") defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default", "org.apache.spark.sql.parquet") self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json") - df.saveAsTable("savedJsonTable", path=tmpPath, mode="overwrite") + df.write.saveAsTable("savedJsonTable", path=tmpPath, mode="overwrite") actual = self.sqlCtx.createExternalTable("externalJsonTable", path=tmpPath) - self.assertTrue( - sorted(df.collect()) == - sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect())) - self.assertTrue( - sorted(df.collect()) == - sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect())) - self.assertTrue(sorted(df.collect()) == sorted(actual.collect())) + self.assertEqual(sorted(df.collect()), + sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect())) + self.assertEqual(sorted(df.collect()), + sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect())) + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) self.sqlCtx.sql("DROP TABLE savedJsonTable") self.sqlCtx.sql("DROP TABLE externalJsonTable") self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName) shutil.rmtree(tmpPath) + if __name__ == "__main__": unittest.main() |