aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-12 19:05:39 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-12 19:05:39 -0700
commit885d1621bc06bc1f009c9707c3452eac26baf828 (patch)
tree56cf101d515a6bc70abf0f721d10722ecad866fa /python/pyspark/tests.py
parent71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7 (diff)
downloadspark-885d1621bc06bc1f009c9707c3452eac26baf828.tar.gz
spark-885d1621bc06bc1f009c9707c3452eac26baf828.tar.bz2
spark-885d1621bc06bc1f009c9707c3452eac26baf828.zip
[SPARK-3500] [SQL] use JavaSchemaRDD as SchemaRDD._jschema_rdd
Currently, SchemaRDD._jschema_rdd is SchemaRDD, the Scala API (coalesce(), repartition()) can not been called in Python easily, there is no way to specify the implicit parameter `ord`. The _jrdd is an JavaRDD, so _jschema_rdd should also be JavaSchemaRDD. In this patch, change _schema_rdd to JavaSchemaRDD, also added an assert for it. If some methods are missing from JavaSchemaRDD, then it's called by _schema_rdd.baseSchemaRDD().xxx(). BTW, Do we need JavaSQLContext? Author: Davies Liu <davies.liu@gmail.com> Closes #2369 from davies/fix_schemardd and squashes the following commits: abee159 [Davies Liu] use JavaSchemaRDD as SchemaRDD._jschema_rdd
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py28
1 files changed, 28 insertions, 0 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 2e7c2750a8..b687d695b0 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -607,6 +607,34 @@ class TestSQL(PySparkTestCase):
[res] = self.sqlCtx.sql("SELECT MYUDF('')").collect()
self.assertEqual("", res[0])
+ def test_basic_functions(self):
+ rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
+ srdd = self.sqlCtx.jsonRDD(rdd)
+ srdd.count()
+ srdd.collect()
+ srdd.schemaString()
+ srdd.schema()
+
+ # cache and checkpoint
+ self.assertFalse(srdd.is_cached)
+ srdd.persist()
+ srdd.unpersist()
+ srdd.cache()
+ self.assertTrue(srdd.is_cached)
+ self.assertFalse(srdd.isCheckpointed())
+ self.assertEqual(None, srdd.getCheckpointFile())
+
+ srdd = srdd.coalesce(2, True)
+ srdd = srdd.repartition(3)
+ srdd = srdd.distinct()
+ srdd.intersection(srdd)
+ self.assertEqual(2, srdd.count())
+
+ srdd.registerTempTable("temp")
+ srdd = self.sqlCtx.sql("select foo from temp")
+ srdd.count()
+ srdd.collect()
+
class TestIO(PySparkTestCase):