diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-09-12 19:05:39 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-09-12 19:05:39 -0700 |
commit | 885d1621bc06bc1f009c9707c3452eac26baf828 (patch) | |
tree | 56cf101d515a6bc70abf0f721d10722ecad866fa /python/pyspark/tests.py | |
parent | 71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7 (diff) | |
download | spark-885d1621bc06bc1f009c9707c3452eac26baf828.tar.gz spark-885d1621bc06bc1f009c9707c3452eac26baf828.tar.bz2 spark-885d1621bc06bc1f009c9707c3452eac26baf828.zip |
[SPARK-3500] [SQL] use JavaSchemaRDD as SchemaRDD._jschema_rdd
Currently, SchemaRDD._jschema_rdd is SchemaRDD, the Scala API (coalesce(), repartition()) can not been called in Python easily, there is no way to specify the implicit parameter `ord`. The _jrdd is an JavaRDD, so _jschema_rdd should also be JavaSchemaRDD.
In this patch, change _schema_rdd to JavaSchemaRDD, also added an assert for it. If some methods are missing from JavaSchemaRDD, then it's called by _schema_rdd.baseSchemaRDD().xxx().
BTW, Do we need JavaSQLContext?
Author: Davies Liu <davies.liu@gmail.com>
Closes #2369 from davies/fix_schemardd and squashes the following commits:
abee159 [Davies Liu] use JavaSchemaRDD as SchemaRDD._jschema_rdd
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r-- | python/pyspark/tests.py | 28 |
1 files changed, 28 insertions, 0 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 2e7c2750a8..b687d695b0 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -607,6 +607,34 @@ class TestSQL(PySparkTestCase): [res] = self.sqlCtx.sql("SELECT MYUDF('')").collect() self.assertEqual("", res[0]) + def test_basic_functions(self): + rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) + srdd = self.sqlCtx.jsonRDD(rdd) + srdd.count() + srdd.collect() + srdd.schemaString() + srdd.schema() + + # cache and checkpoint + self.assertFalse(srdd.is_cached) + srdd.persist() + srdd.unpersist() + srdd.cache() + self.assertTrue(srdd.is_cached) + self.assertFalse(srdd.isCheckpointed()) + self.assertEqual(None, srdd.getCheckpointFile()) + + srdd = srdd.coalesce(2, True) + srdd = srdd.repartition(3) + srdd = srdd.distinct() + srdd.intersection(srdd) + self.assertEqual(2, srdd.count()) + + srdd.registerTempTable("temp") + srdd = self.sqlCtx.sql("select foo from temp") + srdd.count() + srdd.collect() + class TestIO(PySparkTestCase): |