aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorkul <kuldeep.bora@gmail.com>2015-02-04 15:08:37 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-04 15:08:37 -0800
commit424cb699ee9b091eb23b86dc018a86e377ad309f (patch)
tree72574f33787fd13d707bfebd7eefab13fb2bf4e9 /sql
parentb90dd39793ab0a13a4559fcfff3bb5305c92ab3b (diff)
downloadspark-424cb699ee9b091eb23b86dc018a86e377ad309f.tar.gz
spark-424cb699ee9b091eb23b86dc018a86e377ad309f.tar.bz2
spark-424cb699ee9b091eb23b86dc018a86e377ad309f.zip
[SPARK-5426][SQL] Add SparkSQL Java API helper methods.
Right now the PR adds few helper methods for java apis. But the issue was opened mainly to get rid of transformations in java api like `.rdd` and `.toJavaRDD` while working with `SQLContext` or `HiveContext`. Author: kul <kuldeep.bora@gmail.com> Closes #4243 from kul/master and squashes the following commits: 2390fba [kul] [SPARK-5426][SQL] Add SparkSQL Java API helper methods.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala17
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java48
2 files changed, 62 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2697e780c0..1661282fc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -221,6 +221,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
DataFrame(this, logicalPlan)
}
+ @DeveloperApi
+ def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+ applySchema(rowRDD.rdd, schema);
+ }
+
/**
* Applies a schema to an RDD of Java Beans.
*
@@ -305,6 +310,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
+ def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0)
+
/**
* :: Experimental ::
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
@@ -323,6 +330,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
applySchema(rowRDD, appliedSchema)
}
+ @Experimental
+ def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
+ jsonRDD(json.rdd, schema)
+ }
+
/**
* :: Experimental ::
*/
@@ -337,6 +349,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
@Experimental
+ def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
+ jsonRDD(json.rdd, samplingRatio);
+ }
+
+ @Experimental
def load(path: String): DataFrame = {
val dataSourceName = conf.defaultDataSourceName
load(dataSourceName, ("path", path))
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
index badd00d34b..8510bac499 100644
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
@@ -98,7 +98,7 @@ public class JavaApplySchemaSuite implements Serializable {
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType schema = DataTypes.createStructType(fields);
- DataFrame df = javaSqlCtx.applySchema(rowRDD.rdd(), schema);
+ DataFrame df = javaSqlCtx.applySchema(rowRDD, schema);
df.registerTempTable("people");
Row[] actual = javaSqlCtx.sql("SELECT * FROM people").collect();
@@ -109,6 +109,48 @@ public class JavaApplySchemaSuite implements Serializable {
Assert.assertEquals(expected, Arrays.asList(actual));
}
+
+
+ @Test
+ public void dataFrameRDDOperations() {
+ List<Person> personList = new ArrayList<Person>(2);
+ Person person1 = new Person();
+ person1.setName("Michael");
+ person1.setAge(29);
+ personList.add(person1);
+ Person person2 = new Person();
+ person2.setName("Yin");
+ person2.setAge(28);
+ personList.add(person2);
+
+ JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map(
+ new Function<Person, Row>() {
+ public Row call(Person person) throws Exception {
+ return RowFactory.create(person.getName(), person.getAge());
+ }
+ });
+
+ List<StructField> fields = new ArrayList<StructField>(2);
+ fields.add(DataTypes.createStructField("name", DataTypes.StringType, false));
+ fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
+ StructType schema = DataTypes.createStructType(fields);
+
+ DataFrame df = javaSqlCtx.applySchema(rowRDD, schema);
+ df.registerTempTable("people");
+ List<String> actual = javaSqlCtx.sql("SELECT * FROM people").toJavaRDD().map(new Function<Row, String>() {
+
+ public String call(Row row) {
+ return row.getString(0) + "_" + row.get(1).toString();
+ }
+ }).collect();
+
+ List<String> expected = new ArrayList<String>(2);
+ expected.add("Michael_29");
+ expected.add("Yin_28");
+
+ Assert.assertEquals(expected, actual);
+ }
+
@Test
public void applySchemaToJSON() {
JavaRDD<String> jsonRDD = javaCtx.parallelize(Arrays.asList(
@@ -147,14 +189,14 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is another simple string."));
- DataFrame df1 = javaSqlCtx.jsonRDD(jsonRDD.rdd());
+ DataFrame df1 = javaSqlCtx.jsonRDD(jsonRDD);
StructType actualSchema1 = df1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
df1.registerTempTable("jsonTable1");
List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);
- DataFrame df2 = javaSqlCtx.jsonRDD(jsonRDD.rdd(), expectedSchema);
+ DataFrame df2 = javaSqlCtx.jsonRDD(jsonRDD, expectedSchema);
StructType actualSchema2 = df2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
df2.registerTempTable("jsonTable2");