diff options
author | kul <kuldeep.bora@gmail.com> | 2015-02-04 15:08:37 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-04 15:08:37 -0800 |
commit | 424cb699ee9b091eb23b86dc018a86e377ad309f (patch) | |
tree | 72574f33787fd13d707bfebd7eefab13fb2bf4e9 /sql | |
parent | b90dd39793ab0a13a4559fcfff3bb5305c92ab3b (diff) | |
download | spark-424cb699ee9b091eb23b86dc018a86e377ad309f.tar.gz spark-424cb699ee9b091eb23b86dc018a86e377ad309f.tar.bz2 spark-424cb699ee9b091eb23b86dc018a86e377ad309f.zip |
[SPARK-5426][SQL] Add SparkSQL Java API helper methods.
Right now the PR adds few helper methods for java apis. But the issue was opened mainly to get rid of transformations in java api like `.rdd` and `.toJavaRDD` while working with `SQLContext` or `HiveContext`.
Author: kul <kuldeep.bora@gmail.com>
Closes #4243 from kul/master and squashes the following commits:
2390fba [kul] [SPARK-5426][SQL] Add SparkSQL Java API helper methods.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 17 | ||||
-rw-r--r-- | sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java | 48 |
2 files changed, 62 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 2697e780c0..1661282fc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -221,6 +221,11 @@ class SQLContext(@transient val sparkContext: SparkContext) DataFrame(this, logicalPlan) } + @DeveloperApi + def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = { + applySchema(rowRDD.rdd, schema); + } + /** * Applies a schema to an RDD of Java Beans. * @@ -305,6 +310,8 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0) + def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0) + /** * :: Experimental :: * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema, @@ -323,6 +330,11 @@ class SQLContext(@transient val sparkContext: SparkContext) applySchema(rowRDD, appliedSchema) } + @Experimental + def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = { + jsonRDD(json.rdd, schema) + } + /** * :: Experimental :: */ @@ -337,6 +349,11 @@ class SQLContext(@transient val sparkContext: SparkContext) } @Experimental + def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = { + jsonRDD(json.rdd, samplingRatio); + } + + @Experimental def load(path: String): DataFrame = { val dataSourceName = conf.defaultDataSourceName load(dataSourceName, ("path", path)) diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java index badd00d34b..8510bac499 100644 --- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java +++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java @@ -98,7 +98,7 @@ public class JavaApplySchemaSuite implements Serializable { fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false)); StructType schema = DataTypes.createStructType(fields); - DataFrame df = javaSqlCtx.applySchema(rowRDD.rdd(), schema); + DataFrame df = javaSqlCtx.applySchema(rowRDD, schema); df.registerTempTable("people"); Row[] actual = javaSqlCtx.sql("SELECT * FROM people").collect(); @@ -109,6 +109,48 @@ public class JavaApplySchemaSuite implements Serializable { Assert.assertEquals(expected, Arrays.asList(actual)); } + + + @Test + public void dataFrameRDDOperations() { + List<Person> personList = new ArrayList<Person>(2); + Person person1 = new Person(); + person1.setName("Michael"); + person1.setAge(29); + personList.add(person1); + Person person2 = new Person(); + person2.setName("Yin"); + person2.setAge(28); + personList.add(person2); + + JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map( + new Function<Person, Row>() { + public Row call(Person person) throws Exception { + return RowFactory.create(person.getName(), person.getAge()); + } + }); + + List<StructField> fields = new ArrayList<StructField>(2); + fields.add(DataTypes.createStructField("name", DataTypes.StringType, false)); + fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false)); + StructType schema = DataTypes.createStructType(fields); + + DataFrame df = javaSqlCtx.applySchema(rowRDD, schema); + df.registerTempTable("people"); + List<String> actual = javaSqlCtx.sql("SELECT * FROM people").toJavaRDD().map(new Function<Row, String>() { + + public String call(Row row) { + return row.getString(0) + "_" + row.get(1).toString(); + } + }).collect(); + + List<String> expected = new ArrayList<String>(2); + expected.add("Michael_29"); + expected.add("Yin_28"); + + Assert.assertEquals(expected, actual); + } + @Test public void applySchemaToJSON() { JavaRDD<String> jsonRDD = javaCtx.parallelize(Arrays.asList( @@ -147,14 +189,14 @@ public class JavaApplySchemaSuite implements Serializable { null, "this is another simple string.")); - DataFrame df1 = javaSqlCtx.jsonRDD(jsonRDD.rdd()); + DataFrame df1 = javaSqlCtx.jsonRDD(jsonRDD); StructType actualSchema1 = df1.schema(); Assert.assertEquals(expectedSchema, actualSchema1); df1.registerTempTable("jsonTable1"); List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collectAsList(); Assert.assertEquals(expectedResult, actual1); - DataFrame df2 = javaSqlCtx.jsonRDD(jsonRDD.rdd(), expectedSchema); + DataFrame df2 = javaSqlCtx.jsonRDD(jsonRDD, expectedSchema); StructType actualSchema2 = df2.schema(); Assert.assertEquals(expectedSchema, actualSchema2); df2.registerTempTable("jsonTable2"); |