aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py107
1 files changed, 104 insertions, 3 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index d25c6365ed..bc945091f7 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -34,10 +34,9 @@ if sys.version_info[:2] <= (2, 6):
else:
import unittest
-
-from pyspark.sql import SQLContext, Column
+from pyspark.sql import SQLContext, HiveContext, Column
from pyspark.sql.types import IntegerType, Row, ArrayType, StructType, StructField, \
- UserDefinedType, DoubleType, LongType
+ UserDefinedType, DoubleType, LongType, StringType
from pyspark.tests import ReusedPySparkTestCase
@@ -286,6 +285,37 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(95 < g.agg(Dsl.approxCountDistinct(df.key)).first()[0])
self.assertEqual(100, g.agg(Dsl.countDistinct(df.value)).first()[0])
+ def test_save_and_load(self):
+ df = self.df
+ tmpPath = tempfile.mkdtemp()
+ shutil.rmtree(tmpPath)
+ df.save(tmpPath, "org.apache.spark.sql.json", "error")
+ actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json")
+ self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+
+ schema = StructType([StructField("value", StringType(), True)])
+ actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json", schema)
+ self.assertTrue(sorted(df.select("value").collect()) == sorted(actual.collect()))
+
+ df.save(tmpPath, "org.apache.spark.sql.json", "overwrite")
+ actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json")
+ self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+
+ df.save(source="org.apache.spark.sql.json", mode="overwrite", path=tmpPath,
+ noUse="this options will not be used in save.")
+ actual = self.sqlCtx.load(source="org.apache.spark.sql.json", path=tmpPath,
+ noUse="this options will not be used in load.")
+ self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+
+ defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default",
+ "org.apache.spark.sql.parquet")
+ self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json")
+ actual = self.sqlCtx.load(path=tmpPath)
+ self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+ self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
+
+ shutil.rmtree(tmpPath)
+
def test_help_command(self):
# Regression test for SPARK-5464
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
@@ -296,5 +326,76 @@ class SQLTests(ReusedPySparkTestCase):
pydoc.render_doc(df.take(1))
+class HiveContextSQLTests(ReusedPySparkTestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ ReusedPySparkTestCase.setUpClass()
+ cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
+ os.unlink(cls.tempdir.name)
+ print "type", type(cls.sc)
+ print "type", type(cls.sc._jsc)
+ _scala_HiveContext =\
+ cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
+ cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
+ cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
+ rdd = cls.sc.parallelize(cls.testData)
+ cls.df = cls.sqlCtx.inferSchema(rdd)
+
+ @classmethod
+ def tearDownClass(cls):
+ ReusedPySparkTestCase.tearDownClass()
+ shutil.rmtree(cls.tempdir.name, ignore_errors=True)
+
+ def test_save_and_load_table(self):
+ df = self.df
+ tmpPath = tempfile.mkdtemp()
+ shutil.rmtree(tmpPath)
+ df.saveAsTable("savedJsonTable", "org.apache.spark.sql.json", "append", path=tmpPath)
+ actual = self.sqlCtx.createExternalTable("externalJsonTable", tmpPath,
+ "org.apache.spark.sql.json")
+ self.assertTrue(
+ sorted(df.collect()) ==
+ sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect()))
+ self.assertTrue(
+ sorted(df.collect()) ==
+ sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect()))
+ self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+ self.sqlCtx.sql("DROP TABLE externalJsonTable")
+
+ df.saveAsTable("savedJsonTable", "org.apache.spark.sql.json", "overwrite", path=tmpPath)
+ schema = StructType([StructField("value", StringType(), True)])
+ actual = self.sqlCtx.createExternalTable("externalJsonTable",
+ source="org.apache.spark.sql.json",
+ schema=schema, path=tmpPath,
+ noUse="this options will not be used")
+ self.assertTrue(
+ sorted(df.collect()) ==
+ sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect()))
+ self.assertTrue(
+ sorted(df.select("value").collect()) ==
+ sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect()))
+ self.assertTrue(sorted(df.select("value").collect()) == sorted(actual.collect()))
+ self.sqlCtx.sql("DROP TABLE savedJsonTable")
+ self.sqlCtx.sql("DROP TABLE externalJsonTable")
+
+ defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default",
+ "org.apache.spark.sql.parquet")
+ self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json")
+ df.saveAsTable("savedJsonTable", path=tmpPath, mode="overwrite")
+ actual = self.sqlCtx.createExternalTable("externalJsonTable", path=tmpPath)
+ self.assertTrue(
+ sorted(df.collect()) ==
+ sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect()))
+ self.assertTrue(
+ sorted(df.collect()) ==
+ sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect()))
+ self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+ self.sqlCtx.sql("DROP TABLE savedJsonTable")
+ self.sqlCtx.sql("DROP TABLE externalJsonTable")
+ self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
+
+ shutil.rmtree(tmpPath)
+
if __name__ == "__main__":
unittest.main()