aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-16 21:09:06 -0800
committerReynold Xin <rxin@databricks.com>2015-01-16 21:09:06 -0800
commit61b427d4b1c4934bd70ed4da844b64f0e9a377aa (patch)
tree5068b31119fa7e2256422d4fdf18703ae64d7ab2
parentee1c1f3a04dfe80843432e349f01178e47f02443 (diff)
downloadspark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.tar.gz
spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.tar.bz2
spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.zip
[SPARK-5193][SQL] Remove Spark SQL Java-specific API.
After the following patches, the main (Scala) API is now usable for Java users directly. https://github.com/apache/spark/pull/4056 https://github.com/apache/spark/pull/4054 https://github.com/apache/spark/pull/4049 https://github.com/apache/spark/pull/4030 https://github.com/apache/spark/pull/3965 https://github.com/apache/spark/pull/3958 Author: Reynold Xin <rxin@databricks.com> Closes #4065 from rxin/sql-java-api and squashes the following commits: b1fd860 [Reynold Xin] Fix Mima 6d86578 [Reynold Xin] Ok one more attempt in fixing Python... e8f1455 [Reynold Xin] Fix Python again... 3e53f91 [Reynold Xin] Fixed Python. 83735da [Reynold Xin] Fix BigDecimal test. e9f1de3 [Reynold Xin] Use scala BigDecimal. 500d2c4 [Reynold Xin] Fix Decimal. ba3bfa2 [Reynold Xin] Updated javadoc for RowFactory. c4ae1c5 [Reynold Xin] [SPARK-5193][SQL] Remove Spark SQL Java-specific API.
-rwxr-xr-xbin/spark-class2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java15
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java14
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java17
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java32
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/Estimator.scala38
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/Transformer.scala24
-rw-r--r--mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java17
-rw-r--r--mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java21
-rw-r--r--mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java13
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--python/pyspark/sql.py48
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala241
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala225
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala153
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala251
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java16
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java34
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java11
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java88
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala209
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala49
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala91
27 files changed, 125 insertions, 1516 deletions
diff --git a/bin/spark-class b/bin/spark-class
index 0d58d95c1a..79af42c72c 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -148,7 +148,7 @@ fi
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2
- echo "You need to build Spark before running $1." 1>&2
+ echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java
index f4b4f8d8c7..247d2a5e31 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java
@@ -33,9 +33,9 @@ import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import org.apache.spark.sql.api.java.Row;
+import org.apache.spark.sql.SchemaRDD;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.Row;
/**
* A simple example demonstrating model selection using CrossValidator.
@@ -55,7 +55,7 @@ public class JavaCrossValidatorExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
- JavaSQLContext jsql = new JavaSQLContext(jsc);
+ SQLContext jsql = new SQLContext(jsc);
// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
@@ -71,8 +71,7 @@ public class JavaCrossValidatorExample {
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
- JavaSchemaRDD training =
- jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
+ SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
@@ -113,11 +112,11 @@ public class JavaCrossValidatorExample {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
- JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
+ SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test).registerAsTable("prediction");
- JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
+ SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java
index e25b271777..5b92655e2e 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java
@@ -28,9 +28,9 @@ import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import org.apache.spark.sql.api.java.Row;
+import org.apache.spark.sql.SchemaRDD;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.Row;
/**
* A simple example demonstrating ways to specify parameters for Estimators and Transformers.
@@ -44,7 +44,7 @@ public class JavaSimpleParamsExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
- JavaSQLContext jsql = new JavaSQLContext(jsc);
+ SQLContext jsql = new SQLContext(jsc);
// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
@@ -54,7 +54,7 @@ public class JavaSimpleParamsExample {
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
- JavaSchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
+ SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
@@ -94,14 +94,14 @@ public class JavaSimpleParamsExample {
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
- JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
+ SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test).registerAsTable("results");
- JavaSchemaRDD results =
+ SchemaRDD results =
jsql.sql("SELECT features, label, probability, prediction FROM results");
for (Row r: results.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java
index 54f18014e4..74db449fad 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java
@@ -21,6 +21,7 @@ import java.util.List;
import com.google.common.collect.Lists;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
@@ -28,10 +29,9 @@ import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import org.apache.spark.sql.api.java.Row;
-import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SchemaRDD;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.Row;
/**
* A simple text classification pipeline that recognizes "spark" from input text. It uses the Java
@@ -46,7 +46,7 @@ public class JavaSimpleTextClassificationPipeline {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline");
JavaSparkContext jsc = new JavaSparkContext(conf);
- JavaSQLContext jsql = new JavaSQLContext(jsc);
+ SQLContext jsql = new SQLContext(jsc);
// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
@@ -54,8 +54,7 @@ public class JavaSimpleTextClassificationPipeline {
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
- JavaSchemaRDD training =
- jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
+ SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
@@ -80,11 +79,11 @@ public class JavaSimpleTextClassificationPipeline {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
- JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
+ SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
// Make predictions on test documents.
model.transform(test).registerAsTable("prediction");
- JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
+ SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index 01c77bd443..b70804635d 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -26,9 +26,9 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import org.apache.spark.sql.api.java.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SchemaRDD;
+import org.apache.spark.sql.Row;
public class JavaSparkSQL {
public static class Person implements Serializable {
@@ -55,7 +55,7 @@ public class JavaSparkSQL {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
- JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
+ SQLContext sqlCtx = new SQLContext(ctx);
System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
@@ -74,15 +74,15 @@ public class JavaSparkSQL {
});
// Apply a schema to an RDD of Java Beans and register it as a table.
- JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
+ SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
schemaPeople.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
- JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
+ SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
- List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
+ List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
@@ -99,13 +99,13 @@ public class JavaSparkSQL {
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
- JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
+ SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
- JavaSchemaRDD teenagers2 =
+ SchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
- teenagerNames = teenagers2.map(new Function<Row, String>() {
+ teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
@@ -120,7 +120,7 @@ public class JavaSparkSQL {
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
- JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
+ SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
@@ -134,11 +134,11 @@ public class JavaSparkSQL {
peopleFromJsonFile.registerTempTable("people");
// SQL statements can be run by using the sql methods provided by sqlCtx.
- JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
+ SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
- teenagerNames = teenagers3.map(new Function<Row, String>() {
+ teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) { return "Name: " + row.getString(0); }
}).collect();
@@ -151,7 +151,7 @@ public class JavaSparkSQL {
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
- JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD);
+ SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
// Take a look at the schema of this new JavaSchemaRDD.
peopleFromJsonRDD.printSchema();
@@ -164,8 +164,8 @@ public class JavaSparkSQL {
peopleFromJsonRDD.registerTempTable("people2");
- JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
- List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
+ SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
+ List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala b/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
index fdbee743e8..77d230eb4a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
@@ -18,12 +18,10 @@
package org.apache.spark.ml
import scala.annotation.varargs
-import scala.collection.JavaConverters._
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param.{ParamMap, ParamPair, Params}
import org.apache.spark.sql.SchemaRDD
-import org.apache.spark.sql.api.java.JavaSchemaRDD
/**
* :: AlphaComponent ::
@@ -66,40 +64,4 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
def fit(dataset: SchemaRDD, paramMaps: Array[ParamMap]): Seq[M] = {
paramMaps.map(fit(dataset, _))
}
-
- // Java-friendly versions of fit.
-
- /**
- * Fits a single model to the input data with optional parameters.
- *
- * @param dataset input dataset
- * @param paramPairs optional list of param pairs (overwrite embedded params)
- * @return fitted model
- */
- @varargs
- def fit(dataset: JavaSchemaRDD, paramPairs: ParamPair[_]*): M = {
- fit(dataset.schemaRDD, paramPairs: _*)
- }
-
- /**
- * Fits a single model to the input data with provided parameter map.
- *
- * @param dataset input dataset
- * @param paramMap parameter map
- * @return fitted model
- */
- def fit(dataset: JavaSchemaRDD, paramMap: ParamMap): M = {
- fit(dataset.schemaRDD, paramMap)
- }
-
- /**
- * Fits multiple models to the input data with multiple sets of parameters.
- *
- * @param dataset input dataset
- * @param paramMaps an array of parameter maps
- * @return fitted models, matching the input parameter maps
- */
- def fit(dataset: JavaSchemaRDD, paramMaps: Array[ParamMap]): java.util.List[M] = {
- fit(dataset.schemaRDD, paramMaps).asJava
- }
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
index 1331b91240..af56f9c435 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
@@ -23,7 +23,6 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.sql.SchemaRDD
-import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.expressions.ScalaUdf
import org.apache.spark.sql.types._
@@ -55,29 +54,6 @@ abstract class Transformer extends PipelineStage with Params {
* @return transformed dataset
*/
def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD
-
- // Java-friendly versions of transform.
-
- /**
- * Transforms the dataset with optional parameters.
- * @param dataset input datset
- * @param paramPairs optional list of param pairs, overwrite embedded params
- * @return transformed dataset
- */
- @varargs
- def transform(dataset: JavaSchemaRDD, paramPairs: ParamPair[_]*): JavaSchemaRDD = {
- transform(dataset.schemaRDD, paramPairs: _*).toJavaSchemaRDD
- }
-
- /**
- * Transforms the dataset with provided parameter map as additional parameters.
- * @param dataset input dataset
- * @param paramMap additional parameters, overwrite embedded params
- * @return transformed dataset
- */
- def transform(dataset: JavaSchemaRDD, paramMap: ParamMap): JavaSchemaRDD = {
- transform(dataset.schemaRDD, paramMap).toJavaSchemaRDD
- }
}
/**
diff --git a/mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java b/mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java
index 42846677ed..47f1f46c6c 100644
--- a/mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java
+++ b/mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java
@@ -26,10 +26,9 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.StandardScaler;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import static org.apache.spark.mllib.classification.LogisticRegressionSuite
- .generateLogisticInputAsList;
+import org.apache.spark.sql.SchemaRDD;
+import org.apache.spark.sql.SQLContext;
+import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList;
/**
* Test Pipeline construction and fitting in Java.
@@ -37,13 +36,13 @@ import static org.apache.spark.mllib.classification.LogisticRegressionSuite
public class JavaPipelineSuite {
private transient JavaSparkContext jsc;
- private transient JavaSQLContext jsql;
- private transient JavaSchemaRDD dataset;
+ private transient SQLContext jsql;
+ private transient SchemaRDD dataset;
@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaPipelineSuite");
- jsql = new JavaSQLContext(jsc);
+ jsql = new SQLContext(jsc);
JavaRDD<LabeledPoint> points =
jsc.parallelize(generateLogisticInputAsList(1.0, 1.0, 100, 42), 2);
dataset = jsql.applySchema(points, LabeledPoint.class);
@@ -66,7 +65,7 @@ public class JavaPipelineSuite {
.setStages(new PipelineStage[] {scaler, lr});
PipelineModel model = pipeline.fit(dataset);
model.transform(dataset).registerTempTable("prediction");
- JavaSchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
- predictions.collect();
+ SchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
+ predictions.collectAsList();
}
}
diff --git a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java
index 76eb7f0032..2eba83335b 100644
--- a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java
@@ -26,21 +26,20 @@ import org.junit.Test;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import static org.apache.spark.mllib.classification.LogisticRegressionSuite
- .generateLogisticInputAsList;
+import org.apache.spark.sql.SchemaRDD;
+import org.apache.spark.sql.SQLContext;
+import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList;
public class JavaLogisticRegressionSuite implements Serializable {
private transient JavaSparkContext jsc;
- private transient JavaSQLContext jsql;
- private transient JavaSchemaRDD dataset;
+ private transient SQLContext jsql;
+ private transient SchemaRDD dataset;
@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaLogisticRegressionSuite");
- jsql = new JavaSQLContext(jsc);
+ jsql = new SQLContext(jsc);
List<LabeledPoint> points = generateLogisticInputAsList(1.0, 1.0, 100, 42);
dataset = jsql.applySchema(jsc.parallelize(points, 2), LabeledPoint.class);
}
@@ -56,8 +55,8 @@ public class JavaLogisticRegressionSuite implements Serializable {
LogisticRegression lr = new LogisticRegression();
LogisticRegressionModel model = lr.fit(dataset);
model.transform(dataset).registerTempTable("prediction");
- JavaSchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
- predictions.collect();
+ SchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
+ predictions.collectAsList();
}
@Test
@@ -68,8 +67,8 @@ public class JavaLogisticRegressionSuite implements Serializable {
LogisticRegressionModel model = lr.fit(dataset);
model.transform(dataset, model.threshold().w(0.8)) // overwrite threshold
.registerTempTable("prediction");
- JavaSchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
- predictions.collect();
+ SchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
+ predictions.collectAsList();
}
@Test
diff --git a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
index a266ebd207..a9f1c4a2c3 100644
--- a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
+++ b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
@@ -30,21 +30,20 @@ import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.mllib.regression.LabeledPoint;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import static org.apache.spark.mllib.classification.LogisticRegressionSuite
- .generateLogisticInputAsList;
+import org.apache.spark.sql.SchemaRDD;
+import org.apache.spark.sql.SQLContext;
+import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList;
public class JavaCrossValidatorSuite implements Serializable {
private transient JavaSparkContext jsc;
- private transient JavaSQLContext jsql;
- private transient JavaSchemaRDD dataset;
+ private transient SQLContext jsql;
+ private transient SchemaRDD dataset;
@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaCrossValidatorSuite");
- jsql = new JavaSQLContext(jsc);
+ jsql = new SQLContext(jsc);
List<LabeledPoint> points = generateLogisticInputAsList(1.0, 1.0, 100, 42);
dataset = jsql.applySchema(jsc.parallelize(points, 2), LabeledPoint.class);
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index d3ea594245..0ccbfcb0c4 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -78,6 +78,10 @@ object MimaExcludes {
"org.apache.spark.TaskContext.taskAttemptId"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.TaskContext.attemptNumber")
+ ) ++ Seq(
+ // SPARK-5166 Spark SQL API stabilization
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit")
)
case v if v.startsWith("1.2") =>
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index dcd3b60a60..1990323249 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -1458,7 +1458,7 @@ class SQLContext(object):
jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
- return SchemaRDD(srdd.toJavaSchemaRDD(), self)
+ return SchemaRDD(srdd, self)
def registerRDDAsTable(self, rdd, tableName):
"""Registers the given RDD as a temporary table in the catalog.
@@ -1487,7 +1487,7 @@ class SQLContext(object):
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True
"""
- jschema_rdd = self._ssql_ctx.parquetFile(path).toJavaSchemaRDD()
+ jschema_rdd = self._ssql_ctx.parquetFile(path)
return SchemaRDD(jschema_rdd, self)
def jsonFile(self, path, schema=None, samplingRatio=1.0):
@@ -1549,7 +1549,7 @@ class SQLContext(object):
else:
scala_datatype = self._ssql_ctx.parseDataType(schema.json())
srdd = self._ssql_ctx.jsonFile(path, scala_datatype)
- return SchemaRDD(srdd.toJavaSchemaRDD(), self)
+ return SchemaRDD(srdd, self)
def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
"""Loads an RDD storing one JSON object per string as a L{SchemaRDD}.
@@ -1619,7 +1619,7 @@ class SQLContext(object):
else:
scala_datatype = self._ssql_ctx.parseDataType(schema.json())
srdd = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
- return SchemaRDD(srdd.toJavaSchemaRDD(), self)
+ return SchemaRDD(srdd, self)
def sql(self, sqlQuery):
"""Return a L{SchemaRDD} representing the result of the given query.
@@ -1630,7 +1630,7 @@ class SQLContext(object):
>>> srdd2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
"""
- return SchemaRDD(self._ssql_ctx.sql(sqlQuery).toJavaSchemaRDD(), self)
+ return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)
def table(self, tableName):
"""Returns the specified table as a L{SchemaRDD}.
@@ -1641,7 +1641,7 @@ class SQLContext(object):
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True
"""
- return SchemaRDD(self._ssql_ctx.table(tableName).toJavaSchemaRDD(), self)
+ return SchemaRDD(self._ssql_ctx.table(tableName), self)
def cacheTable(self, tableName):
"""Caches the specified table in-memory."""
@@ -1686,24 +1686,6 @@ class HiveContext(SQLContext):
def _get_hive_ctx(self):
return self._jvm.HiveContext(self._jsc.sc())
- def hiveql(self, hqlQuery):
- """
- DEPRECATED: Use sql()
- """
- warnings.warn("hiveql() is deprecated as the sql function now parses using HiveQL by" +
- "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'",
- DeprecationWarning)
- return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery).toJavaSchemaRDD(), self)
-
- def hql(self, hqlQuery):
- """
- DEPRECATED: Use sql()
- """
- warnings.warn("hql() is deprecated as the sql function now parses using HiveQL by" +
- "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'",
- DeprecationWarning)
- return self.hiveql(hqlQuery)
-
class LocalHiveContext(HiveContext):
@@ -1716,12 +1698,6 @@ class LocalHiveContext(HiveContext):
return self._jvm.LocalHiveContext(self._jsc.sc())
-class TestHiveContext(HiveContext):
-
- def _get_hive_ctx(self):
- return self._jvm.TestHiveContext(self._jsc.sc())
-
-
def _create_row(fields, values):
row = Row(*values)
row.__FIELDS__ = fields
@@ -1846,7 +1822,7 @@ class SchemaRDD(RDD):
self.sql_ctx = sql_ctx
self._sc = sql_ctx._sc
clsName = jschema_rdd.getClass().getName()
- assert clsName.endswith("JavaSchemaRDD"), "jschema_rdd must be JavaSchemaRDD"
+ assert clsName.endswith("SchemaRDD"), "jschema_rdd must be SchemaRDD"
self._jschema_rdd = jschema_rdd
self._id = None
self.is_cached = False
@@ -1880,7 +1856,7 @@ class SchemaRDD(RDD):
>>> srdd.limit(0).collect()
[]
"""
- rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD()
+ rdd = self._jschema_rdd.baseSchemaRDD().limit(num)
return SchemaRDD(rdd, self.sql_ctx)
def toJSON(self, use_unicode=False):
@@ -2059,18 +2035,18 @@ class SchemaRDD(RDD):
def getCheckpointFile(self):
checkpointFile = self._jschema_rdd.getCheckpointFile()
- if checkpointFile.isPresent():
+ if checkpointFile.isDefined():
return checkpointFile.get()
def coalesce(self, numPartitions, shuffle=False):
- rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
+ rdd = self._jschema_rdd.coalesce(numPartitions, shuffle, None)
return SchemaRDD(rdd, self.sql_ctx)
def distinct(self, numPartitions=None):
if numPartitions is None:
rdd = self._jschema_rdd.distinct()
else:
- rdd = self._jschema_rdd.distinct(numPartitions)
+ rdd = self._jschema_rdd.distinct(numPartitions, None)
return SchemaRDD(rdd, self.sql_ctx)
def intersection(self, other):
@@ -2081,7 +2057,7 @@ class SchemaRDD(RDD):
raise ValueError("Can only intersect with another SchemaRDD")
def repartition(self, numPartitions):
- rdd = self._jschema_rdd.repartition(numPartitions)
+ rdd = self._jschema_rdd.repartition(numPartitions, None)
return SchemaRDD(rdd, self.sql_ctx)
def subtract(self, other, numPartitions=None):
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java b/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java
index 62fcec824d..5ed60fe78d 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java
@@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow;
public class RowFactory {
/**
- * Create a {@link Row} from an array of values. Position i in the array becomes position i
- * in the created {@link Row} object.
+ * Create a {@link Row} from the given arguments. Position i in the argument list becomes
+ * position i in the created {@link Row} object.
*/
- public static Row create(Object[] values) {
+ public static Row create(Object ... values) {
return new GenericRow(values);
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 3744d77c07..a85c4316e1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -143,6 +143,8 @@ final class Decimal extends Ordered[Decimal] with Serializable {
}
}
+ def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.bigDecimal
+
def toUnscaledLong: Long = {
if (decimalVal.ne(null)) {
decimalVal.underlying().unscaledValue().longValue()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 8ad1753dab..f23cb18c92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,7 +26,7 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
@@ -58,6 +58,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
self =>
+ def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
+
// Note that this is a lazy val so we can override the default value in subclasses.
protected[sql] lazy val conf: SQLConf = new SQLConf
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 686bcdfbb4..ae4d8ba90c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -30,7 +30,6 @@ import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
@@ -409,13 +408,6 @@ class SchemaRDD(
def toSchemaRDD = this
/**
- * Returns this RDD as a JavaSchemaRDD.
- *
- * @group schema
- */
- def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
-
- /**
* Converts a JavaRDD to a PythonRDD. It is used by pyspark.
*/
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
@@ -470,6 +462,8 @@ class SchemaRDD(
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
+ def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(collect() : _*)
+
override def take(num: Int): Array[Row] = limit(num).collect()
// =======================================================================
@@ -482,13 +476,15 @@ class SchemaRDD(
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.coalesce(numPartitions, shuffle)(ord))
- override def distinct(): SchemaRDD =
- applySchema(super.distinct())
+ override def distinct(): SchemaRDD = applySchema(super.distinct())
override def distinct(numPartitions: Int)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.distinct(numPartitions)(ord))
+ def distinct(numPartitions: Int): SchemaRDD =
+ applySchema(super.distinct(numPartitions)(null))
+
override def filter(f: Row => Boolean): SchemaRDD =
applySchema(super.filter(f))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
deleted file mode 100644
index a75f559928..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.api.java
-
-import java.beans.Introspector
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
-import org.apache.spark.sql.execution.LogicalRDD
-import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation}
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-/**
- * The entry point for executing Spark SQL queries from a Java program.
- */
-class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
-
- def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc))
-
- def baseRelationToSchemaRDD(baseRelation: BaseRelation): JavaSchemaRDD = {
- new JavaSchemaRDD(sqlContext, LogicalRelation(baseRelation))
- }
-
- /**
- * Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is
- * used for SQL parsing can be configured with 'spark.sql.dialect'.
- *
- * @group userf
- */
- def sql(sqlText: String): JavaSchemaRDD = {
- if (sqlContext.conf.dialect == "sql") {
- new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText))
- } else {
- sys.error(s"Unsupported SQL dialect: $sqlContext.dialect")
- }
- }
-
- /**
- * :: Experimental ::
- * Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
- * a table. This registered table can be used as the target of future `insertInto` operations.
- *
- * {{{
- * JavaSQLContext sqlCtx = new JavaSQLContext(...)
- *
- * sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerTempTable("people")
- * sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
- * }}}
- *
- * @param beanClass A java bean class object that will be used to determine the schema of the
- * parquet file.
- * @param path The path where the directory containing parquet metadata should be created.
- * Data inserted into this table will also be stored at this location.
- * @param allowExisting When false, an exception will be thrown if this directory already exists.
- * @param conf A Hadoop configuration object that can be used to specific options to the parquet
- * output format.
- */
- @Experimental
- def createParquetFile(
- beanClass: Class[_],
- path: String,
- allowExisting: Boolean = true,
- conf: Configuration = new Configuration()): JavaSchemaRDD = {
- new JavaSchemaRDD(
- sqlContext,
- ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext))
- }
-
- /**
- * Applies a schema to an RDD of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- */
- def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
- val attributeSeq = getSchema(beanClass)
- val className = beanClass.getName
- val rowRdd = rdd.rdd.mapPartitions { iter =>
- // BeanInfo is not serializable so we must rediscover it remotely for each partition.
- val localBeanInfo = Introspector.getBeanInfo(
- Class.forName(className, true, Utils.getContextOrSparkClassLoader))
- val extractors =
- localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
-
- iter.map { row =>
- new GenericRow(
- extractors.zip(attributeSeq).map { case (e, attr) =>
- DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
- }.toArray[Any]
- ): ScalaRow
- }
- }
- new JavaSchemaRDD(sqlContext, LogicalRDD(attributeSeq, rowRdd)(sqlContext))
- }
-
- /**
- * :: DeveloperApi ::
- * Creates a JavaSchemaRDD from an RDD containing Rows by applying a schema to this RDD.
- * It is important to make sure that the structure of every Row of the provided RDD matches the
- * provided schema. Otherwise, there will be runtime exception.
- */
- @DeveloperApi
- def applySchema(rowRDD: JavaRDD[Row], schema: StructType): JavaSchemaRDD = {
- val scalaRowRDD = rowRDD.rdd.map(r => r.row)
- val logicalPlan =
- LogicalRDD(schema.toAttributes, scalaRowRDD)(sqlContext)
- new JavaSchemaRDD(sqlContext, logicalPlan)
- }
-
- /**
- * Loads a parquet file from regular path or files that match file patterns in path,
- * returning the result as a [[JavaSchemaRDD]].
- * Supported glob file pattern information at ([[http://tinyurl.com/kcqrzn8]]).
- */
- def parquetFile(path: String): JavaSchemaRDD =
- new JavaSchemaRDD(
- sqlContext,
- ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext))
-
- /**
- * Loads a JSON file (one object per line), returning the result as a JavaSchemaRDD.
- * It goes through the entire dataset once to determine the schema.
- */
- def jsonFile(path: String): JavaSchemaRDD =
- jsonRDD(sqlContext.sparkContext.textFile(path))
-
- /**
- * :: Experimental ::
- * Loads a JSON file (one object per line) and applies the given schema,
- * returning the result as a JavaSchemaRDD.
- */
- @Experimental
- def jsonFile(path: String, schema: StructType): JavaSchemaRDD =
- jsonRDD(sqlContext.sparkContext.textFile(path), schema)
-
- /**
- * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
- * JavaSchemaRDD.
- * It goes through the entire dataset once to determine the schema.
- */
- def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
- val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
- val appliedScalaSchema =
- JsonRDD.nullTypeToStringType(
- JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
- val scalaRowRDD =
- JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
- val logicalPlan =
- LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
- new JavaSchemaRDD(sqlContext, logicalPlan)
- }
-
- /**
- * :: Experimental ::
- * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
- * returning the result as a JavaSchemaRDD.
- */
- @Experimental
- def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
- val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
- val appliedScalaSchema =
- Option(schema).getOrElse(
- JsonRDD.nullTypeToStringType(
- JsonRDD.inferSchema(
- json.rdd, 1.0, columnNameOfCorruptJsonRecord)))
- val scalaRowRDD = JsonRDD.jsonStringToRow(
- json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
- val logicalPlan =
- LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
- new JavaSchemaRDD(sqlContext, logicalPlan)
- }
-
- /**
- * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
- * during the lifetime of this instance of SQLContext.
- */
- def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
- sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
- }
-
- /**
- * Returns a Catalyst Schema for the given java bean class.
- */
- protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
- // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
- val beanInfo = Introspector.getBeanInfo(beanClass)
-
- // Note: The ordering of elements may differ from when the schema is inferred in Scala.
- // This is because beanInfo.getPropertyDescriptors gives no guarantees about
- // element ordering.
- val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
- fields.map { property =>
- val (dataType, nullable) = property.getPropertyType match {
- case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
- (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
- case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
- case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
- case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
- case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
- case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
- case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
- case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
- case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
-
- case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
- case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
- case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
- case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
- case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
- case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
- case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
- case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
- case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
- case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
- }
- AttributeReference(property.getName, dataType, nullable)()
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
deleted file mode 100644
index 9e10e532fb..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.api.java
-
-import java.util.{List => JList}
-
-import org.apache.spark.Partitioner
-import org.apache.spark.api.java.{JavaRDD, JavaRDDLike}
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.storage.StorageLevel
-
-/**
- * An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
- * standard RDD operations, a JavaSchemaRDD can also be registered as a table in the JavaSQLContext
- * that was used to create. Registering a JavaSchemaRDD allows its contents to be queried in
- * future SQL statement.
- *
- * @groupname schema SchemaRDD Functions
- * @groupprio schema -1
- * @groupname Ungrouped Base RDD Functions
- */
-class JavaSchemaRDD(
- @transient val sqlContext: SQLContext,
- @transient val baseLogicalPlan: LogicalPlan)
- extends JavaRDDLike[Row, JavaRDD[Row]]
- with SchemaRDDLike {
-
- private[sql] val baseSchemaRDD = new SchemaRDD(sqlContext, logicalPlan)
-
- /** Returns the underlying Scala SchemaRDD. */
- val schemaRDD: SchemaRDD = baseSchemaRDD
-
- override val classTag = scala.reflect.classTag[Row]
-
- override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
-
- val rdd = baseSchemaRDD.map(new Row(_))
-
- override def toString: String = baseSchemaRDD.toString
-
- /** Returns the schema of this JavaSchemaRDD (represented by a StructType). */
- def schema: StructType = baseSchemaRDD.schema.asInstanceOf[StructType]
-
- // =======================================================================
- // Base RDD functions that do NOT change schema
- // =======================================================================
-
- // Common RDD functions
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- def cache(): JavaSchemaRDD = {
- baseSchemaRDD.cache()
- this
- }
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- def persist(): JavaSchemaRDD = {
- baseSchemaRDD.persist()
- this
- }
-
- /**
- * Set this RDD's storage level to persist its values across operations after the first time
- * it is computed. This can only be used to assign a new storage level if the RDD does not
- * have a storage level set yet..
- */
- def persist(newLevel: StorageLevel): JavaSchemaRDD = {
- baseSchemaRDD.persist(newLevel)
- this
- }
-
- /**
- * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
- *
- * @param blocking Whether to block until all blocks are deleted.
- * @return This RDD.
- */
- def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
- baseSchemaRDD.unpersist(blocking)
- this
- }
-
- /** Assign a name to this RDD */
- def setName(name: String): JavaSchemaRDD = {
- baseSchemaRDD.setName(name)
- this
- }
-
- // Overridden actions from JavaRDDLike.
-
- override def collect(): JList[Row] = {
- import scala.collection.JavaConversions._
- val arr: java.util.Collection[Row] = baseSchemaRDD.collect().toSeq.map(new Row(_))
- new java.util.ArrayList(arr)
- }
-
- override def count(): Long = baseSchemaRDD.count
-
- override def take(num: Int): JList[Row] = {
- import scala.collection.JavaConversions._
- val arr: java.util.Collection[Row] = baseSchemaRDD.take(num).toSeq.map(new Row(_))
- new java.util.ArrayList(arr)
- }
-
- // Transformations (return a new RDD)
-
- /**
- * Returns a new RDD with each row transformed to a JSON string.
- */
- def toJSON(): JavaRDD[String] =
- baseSchemaRDD.toJSON.toJavaRDD
-
- /**
- * Return a new RDD that is reduced into `numPartitions` partitions.
- */
- def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
- baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD
-
- /**
- * Return a new RDD containing the distinct elements in this RDD.
- */
- def distinct(): JavaSchemaRDD =
- baseSchemaRDD.distinct().toJavaSchemaRDD
-
- /**
- * Return a new RDD containing the distinct elements in this RDD.
- */
- def distinct(numPartitions: Int): JavaSchemaRDD =
- baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD
-
- /**
- * Return a new RDD containing only the elements that satisfy a predicate.
- */
- def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
- baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD
-
- /**
- * Return the intersection of this RDD and another one. The output will not contain any
- * duplicate elements, even if the input RDDs did.
- *
- * Note that this method performs a shuffle internally.
- */
- def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
- this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD
-
- /**
- * Return the intersection of this RDD and another one. The output will not contain any
- * duplicate elements, even if the input RDDs did.
- *
- * Note that this method performs a shuffle internally.
- *
- * @param partitioner Partitioner to use for the resulting RDD
- */
- def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
- this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD
-
- /**
- * Return the intersection of this RDD and another one. The output will not contain any
- * duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster
- *
- * Note that this method performs a shuffle internally.
- *
- * @param numPartitions How many partitions to use in the resulting RDD
- */
- def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
- this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
-
- /**
- * Return a new RDD that has exactly `numPartitions` partitions.
- *
- * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
- * a shuffle to redistribute data.
- *
- * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
- * which can avoid performing a shuffle.
- */
- def repartition(numPartitions: Int): JavaSchemaRDD =
- baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- *
- * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
- * RDD will be &lt;= us.
- */
- def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
- this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- */
- def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
- this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- */
- def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
- this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD
-
- /**
- * Return a SchemaRDD with a sampled version of the underlying dataset.
- */
- def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaSchemaRDD =
- this.baseSchemaRDD.sample(withReplacement, fraction, seed).toJavaSchemaRDD
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
deleted file mode 100644
index 4faa79af25..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.api.java
-
-import scala.annotation.varargs
-import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
-import scala.collection.JavaConversions
-import scala.math.BigDecimal
-
-import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
-import org.apache.spark.sql.{Row => ScalaRow}
-
-/**
- * A result row from a Spark SQL query.
- */
-class Row(private[spark] val row: ScalaRow) extends Serializable {
-
- /** Returns the number of columns present in this Row. */
- def length: Int = row.length
-
- /** Returns the value of column `i`. */
- def get(i: Int): Any =
- Row.toJavaValue(row(i))
-
- /** Returns true if value at column `i` is NULL. */
- def isNullAt(i: Int) = get(i) == null
-
- /**
- * Returns the value of column `i` as an int. This function will throw an exception if the value
- * is at `i` is not an integer, or if it is null.
- */
- def getInt(i: Int): Int =
- row.getInt(i)
-
- /**
- * Returns the value of column `i` as a long. This function will throw an exception if the value
- * is at `i` is not a long, or if it is null.
- */
- def getLong(i: Int): Long =
- row.getLong(i)
-
- /**
- * Returns the value of column `i` as a double. This function will throw an exception if the
- * value is at `i` is not a double, or if it is null.
- */
- def getDouble(i: Int): Double =
- row.getDouble(i)
-
- /**
- * Returns the value of column `i` as a bool. This function will throw an exception if the value
- * is at `i` is not a boolean, or if it is null.
- */
- def getBoolean(i: Int): Boolean =
- row.getBoolean(i)
-
- /**
- * Returns the value of column `i` as a short. This function will throw an exception if the value
- * is at `i` is not a short, or if it is null.
- */
- def getShort(i: Int): Short =
- row.getShort(i)
-
- /**
- * Returns the value of column `i` as a byte. This function will throw an exception if the value
- * is at `i` is not a byte, or if it is null.
- */
- def getByte(i: Int): Byte =
- row.getByte(i)
-
- /**
- * Returns the value of column `i` as a float. This function will throw an exception if the value
- * is at `i` is not a float, or if it is null.
- */
- def getFloat(i: Int): Float =
- row.getFloat(i)
-
- /**
- * Returns the value of column `i` as a String. This function will throw an exception if the
- * value is at `i` is not a String.
- */
- def getString(i: Int): String =
- row.getString(i)
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[Row]
-
- override def equals(other: Any): Boolean = other match {
- case that: Row =>
- (that canEqual this) &&
- row == that.row
- case _ => false
- }
-
- override def hashCode(): Int = row.hashCode()
-
- override def toString: String = row.toString
-}
-
-object Row {
-
- private def toJavaValue(value: Any): Any = value match {
- // For values of this ScalaRow, we will do the conversion when
- // they are actually accessed.
- case row: ScalaRow => new Row(row)
- case map: scala.collection.Map[_, _] =>
- mapAsSerializableJavaMap(
- map.map {
- case (key, value) => (toJavaValue(key), toJavaValue(value))
- }
- )
- case seq: scala.collection.Seq[_] =>
- JavaConversions.seqAsJavaList(seq.map(toJavaValue))
- case decimal: BigDecimal => decimal.underlying()
- case other => other
- }
-
- // TODO: Consolidate the toScalaValue at here with the scalafy in JsonRDD?
- private def toScalaValue(value: Any): Any = value match {
- // Values of this row have been converted to Scala values.
- case row: Row => row.row
- case map: java.util.Map[_, _] =>
- JMapWrapper(map).map {
- case (key, value) => (toScalaValue(key), toScalaValue(value))
- }
- case list: java.util.List[_] =>
- JListWrapper(list).map(toScalaValue)
- case decimal: java.math.BigDecimal => BigDecimal(decimal)
- case other => other
- }
-
- /**
- * Creates a Row with the given values.
- */
- @varargs def create(values: Any*): Row = {
- // Right now, we cannot use @varargs to annotate the constructor of
- // org.apache.spark.sql.api.java.Row. See https://issues.scala-lang.org/browse/SI-8383.
- new Row(ScalaRow(values.map(toScalaValue):_*))
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala
deleted file mode 100644
index 4186c27451..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.api.java
-
-import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf}
-import org.apache.spark.sql.types.DataType
-
-/**
- * A collection of functions that allow Java users to register UDFs. In order to handle functions
- * of varying airities with minimal boilerplate for our users, we generate classes and functions
- * for each airity up to 22. The code for this generation can be found in comments in this trait.
- */
-private[java] trait UDFRegistration {
- self: JavaSQLContext =>
-
- /* The following functions and required interfaces are generated with these code fragments:
-
- (1 to 22).foreach { i =>
- val extTypeArgs = (1 to i).map(_ => "_").mkString(", ")
- val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ")
- val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]"
- val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
- println(s"""
- |def registerFunction(
- | name: String, f: UDF$i[$extTypeArgs, _], @transient dataType: DataType) = {
- | sqlContext.functionRegistry.registerFunction(
- | name,
- | (e: Seq[Expression]) => ScalaUdf(f$anyCast.call($anyParams), dataType, e))
- |}
- """.stripMargin)
- }
-
- import java.io.File
- import org.apache.spark.sql.catalyst.util.stringToFile
- val directory = new File("sql/core/src/main/java/org/apache/spark/sql/api/java/")
- (1 to 22).foreach { i =>
- val typeArgs = (1 to i).map(i => s"T$i").mkString(", ")
- val args = (1 to i).map(i => s"T$i t$i").mkString(", ")
-
- val contents =
- s"""/*
- | * Licensed to the Apache Software Foundation (ASF) under one or more
- | * contributor license agreements. See the NOTICE file distributed with
- | * this work for additional information regarding copyright ownership.
- | * The ASF licenses this file to You under the Apache License, Version 2.0
- | * (the "License"); you may not use this file except in compliance with
- | * the License. You may obtain a copy of the License at
- | *
- | * http://www.apache.org/licenses/LICENSE-2.0
- | *
- | * Unless required by applicable law or agreed to in writing, software
- | * distributed under the License is distributed on an "AS IS" BASIS,
- | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- | * See the License for the specific language governing permissions and
- | * limitations under the License.
- | */
- |
- |package org.apache.spark.sql.api.java;
- |
- |import java.io.Serializable;
- |
- |// **************************************************
- |// THIS FILE IS AUTOGENERATED BY CODE IN
- |// org.apache.spark.sql.api.java.FunctionRegistration
- |// **************************************************
- |
- |/**
- | * A Spark SQL UDF that has $i arguments.
- | */
- |public interface UDF$i<$typeArgs, R> extends Serializable {
- | public R call($args) throws Exception;
- |}
- |""".stripMargin
-
- stringToFile(new File(directory, s"UDF$i.java"), contents)
- }
-
- */
-
- // scalastyle:off
- def registerFunction(
- name: String, f: UDF1[_, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF2[_, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF3[_, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF4[_, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF5[_, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF6[_, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF7[_, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF8[_, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
-
- def registerFunction(
- name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
- // scalastyle:on
-}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
index 88017eb47d..9ff40471a0 100644
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
@@ -24,6 +24,8 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -31,12 +33,12 @@ import org.apache.spark.sql.types.DataTypes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite implements Serializable {
private transient JavaSparkContext sc;
- private transient JavaSQLContext sqlContext;
+ private transient SQLContext sqlContext;
@Before
public void setUp() {
sc = new JavaSparkContext("local", "JavaAPISuite");
- sqlContext = new JavaSQLContext(sc);
+ sqlContext = new SQLContext(sc);
}
@After
@@ -52,15 +54,14 @@ public class JavaAPISuite implements Serializable {
// sqlContext.registerFunction(
// "stringLengthTest", (String str) -> str.length(), DataType.IntegerType);
- sqlContext.registerFunction("stringLengthTest", new UDF1<String, Integer>() {
+ sqlContext.udf().register("stringLengthTest", new UDF1<String, Integer>() {
@Override
public Integer call(String str) throws Exception {
return str.length();
}
}, DataTypes.IntegerType);
- // TODO: Why do we need this cast?
- Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test')").first();
+ Row result = sqlContext.sql("SELECT stringLengthTest('test')").first();
assert(result.getInt(0) == 4);
}
@@ -73,15 +74,14 @@ public class JavaAPISuite implements Serializable {
// (String str1, String str2) -> str1.length() + str2.length,
// DataType.IntegerType);
- sqlContext.registerFunction("stringLengthTest", new UDF2<String, String, Integer>() {
+ sqlContext.udf().register("stringLengthTest", new UDF2<String, String, Integer>() {
@Override
public Integer call(String str1, String str2) throws Exception {
return str1.length() + str2.length();
}
}, DataTypes.IntegerType);
- // TODO: Why do we need this cast?
- Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test', 'test2')").first();
+ Row result = sqlContext.sql("SELECT stringLengthTest('test', 'test2')").first();
assert(result.getInt(0) == 9);
}
}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
index de586ba635..86d21f49fe 100644
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
@@ -18,7 +18,6 @@
package org.apache.spark.sql.api.java;
import java.io.Serializable;
-import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -31,6 +30,7 @@ import org.junit.Test;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -38,12 +38,12 @@ import org.apache.spark.sql.types.*;
// see http://stackoverflow.com/questions/758570/.
public class JavaApplySchemaSuite implements Serializable {
private transient JavaSparkContext javaCtx;
- private transient JavaSQLContext javaSqlCtx;
+ private transient SQLContext javaSqlCtx;
@Before
public void setUp() {
javaCtx = new JavaSparkContext("local", "JavaApplySchemaSuite");
- javaSqlCtx = new JavaSQLContext(javaCtx);
+ javaSqlCtx = new SQLContext(javaCtx);
}
@After
@@ -89,7 +89,7 @@ public class JavaApplySchemaSuite implements Serializable {
JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map(
new Function<Person, Row>() {
public Row call(Person person) throws Exception {
- return Row.create(person.getName(), person.getAge());
+ return RowFactory.create(person.getName(), person.getAge());
}
});
@@ -98,15 +98,15 @@ public class JavaApplySchemaSuite implements Serializable {
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType schema = DataTypes.createStructType(fields);
- JavaSchemaRDD schemaRDD = javaSqlCtx.applySchema(rowRDD, schema);
+ SchemaRDD schemaRDD = javaSqlCtx.applySchema(rowRDD.rdd(), schema);
schemaRDD.registerTempTable("people");
- List<Row> actual = javaSqlCtx.sql("SELECT * FROM people").collect();
+ Row[] actual = javaSqlCtx.sql("SELECT * FROM people").collect();
List<Row> expected = new ArrayList<Row>(2);
- expected.add(Row.create("Michael", 29));
- expected.add(Row.create("Yin", 28));
+ expected.add(RowFactory.create("Michael", 29));
+ expected.add(RowFactory.create("Yin", 28));
- Assert.assertEquals(expected, actual);
+ Assert.assertEquals(expected, Arrays.asList(actual));
}
@Test
@@ -129,8 +129,8 @@ public class JavaApplySchemaSuite implements Serializable {
StructType expectedSchema = DataTypes.createStructType(fields);
List<Row> expectedResult = new ArrayList<Row>(2);
expectedResult.add(
- Row.create(
- new BigDecimal("92233720368547758070"),
+ RowFactory.create(
+ scala.math.BigDecimal$.MODULE$.apply("92233720368547758070"),
true,
1.7976931348623157E308,
10,
@@ -138,8 +138,8 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is a simple string."));
expectedResult.add(
- Row.create(
- new BigDecimal("92233720368547758069"),
+ RowFactory.create(
+ scala.math.BigDecimal$.MODULE$.apply("92233720368547758069"),
false,
1.7976931348623157E305,
11,
@@ -147,18 +147,18 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is another simple string."));
- JavaSchemaRDD schemaRDD1 = javaSqlCtx.jsonRDD(jsonRDD);
+ SchemaRDD schemaRDD1 = javaSqlCtx.jsonRDD(jsonRDD.rdd());
StructType actualSchema1 = schemaRDD1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
schemaRDD1.registerTempTable("jsonTable1");
- List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collect();
+ List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);
- JavaSchemaRDD schemaRDD2 = javaSqlCtx.jsonRDD(jsonRDD, expectedSchema);
+ SchemaRDD schemaRDD2 = javaSqlCtx.jsonRDD(jsonRDD.rdd(), expectedSchema);
StructType actualSchema2 = schemaRDD2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
schemaRDD2.registerTempTable("jsonTable2");
- List<Row> actual2 = javaSqlCtx.sql("select * from jsonTable2").collect();
+ List<Row> actual2 = javaSqlCtx.sql("select * from jsonTable2").collectAsList();
Assert.assertEquals(expectedResult, actual2);
}
}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java
index 2b5812159d..fbfcd3f59d 100644
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java
@@ -29,6 +29,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+
public class JavaRowSuite {
private byte byteValue;
private short shortValue;
@@ -61,7 +64,7 @@ public class JavaRowSuite {
@Test
public void constructSimpleRow() {
- Row simpleRow = Row.create(
+ Row simpleRow = RowFactory.create(
byteValue, // ByteType
new Byte(byteValue),
shortValue, // ShortType
@@ -137,7 +140,7 @@ public class JavaRowSuite {
simpleMap.put(stringValue + " (3)", longValue - 2);
// Simple struct
- Row simpleStruct = Row.create(
+ Row simpleStruct = RowFactory.create(
doubleValue, stringValue, timestampValue, null);
// Complex array
@@ -150,7 +153,7 @@ public class JavaRowSuite {
complexMap.put(arrayOfRows, simpleStruct);
// Complex struct
- Row complexStruct = Row.create(
+ Row complexStruct = RowFactory.create(
simpleStringArray,
simpleMap,
simpleStruct,
@@ -167,7 +170,7 @@ public class JavaRowSuite {
Assert.assertEquals(null, complexStruct.get(6));
// A very complex row
- Row complexRow = Row.create(arrayOfMaps, arrayOfRows, complexMap, complexStruct);
+ Row complexRow = RowFactory.create(arrayOfMaps, arrayOfRows, complexMap, complexStruct);
Assert.assertEquals(arrayOfMaps, complexRow.get(0));
Assert.assertEquals(arrayOfRows, complexRow.get(1));
Assert.assertEquals(complexMap, complexRow.get(2));
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java
deleted file mode 100644
index 0caa8219a6..0000000000
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.api.java;
-
-import java.io.Serializable;
-import java.util.*;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.MyDenseVector;
-import org.apache.spark.sql.MyLabeledPoint;
-
-public class JavaUserDefinedTypeSuite implements Serializable {
- private transient JavaSparkContext javaCtx;
- private transient JavaSQLContext javaSqlCtx;
-
- @Before
- public void setUp() {
- javaCtx = new JavaSparkContext("local", "JavaUserDefinedTypeSuite");
- javaSqlCtx = new JavaSQLContext(javaCtx);
- }
-
- @After
- public void tearDown() {
- javaCtx.stop();
- javaCtx = null;
- javaSqlCtx = null;
- }
-
- @Test
- public void useScalaUDT() {
- List<MyLabeledPoint> points = Arrays.asList(
- new MyLabeledPoint(1.0, new MyDenseVector(new double[]{0.1, 1.0})),
- new MyLabeledPoint(0.0, new MyDenseVector(new double[]{0.2, 2.0})));
- JavaRDD<MyLabeledPoint> pointsRDD = javaCtx.parallelize(points);
-
- JavaSchemaRDD schemaRDD = javaSqlCtx.applySchema(pointsRDD, MyLabeledPoint.class);
- schemaRDD.registerTempTable("points");
-
- List<Row> actualLabelRows = javaSqlCtx.sql("SELECT label FROM points").collect();
- List<Double> actualLabels = new LinkedList<Double>();
- for (Row r : actualLabelRows) {
- actualLabels.add(r.getDouble(0));
- }
- for (MyLabeledPoint lp : points) {
- Assert.assertTrue(actualLabels.contains(lp.label()));
- }
-
- List<Row> actualFeatureRows = javaSqlCtx.sql("SELECT features FROM points").collect();
- List<MyDenseVector> actualFeatures = new LinkedList<MyDenseVector>();
- for (Row r : actualFeatureRows) {
- actualFeatures.add((MyDenseVector)r.get(0));
- }
- for (MyLabeledPoint lp : points) {
- Assert.assertTrue(actualFeatures.contains(lp.features()));
- }
-
- List<Row> actual = javaSqlCtx.sql("SELECT label, features FROM points").collect();
- List<MyLabeledPoint> actualPoints =
- new LinkedList<MyLabeledPoint>();
- for (Row r : actual) {
- actualPoints.add(new MyLabeledPoint(r.getDouble(0), (MyDenseVector)r.get(1)));
- }
- for (MyLabeledPoint lp : points) {
- Assert.assertTrue(actualPoints.contains(lp));
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
deleted file mode 100644
index fdbb4282ba..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.api.java
-
-import scala.beans.BeanProperty
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.types.NullType
-
-// Implicits
-import scala.collection.JavaConversions._
-
-class PersonBean extends Serializable {
- @BeanProperty
- var name: String = _
-
- @BeanProperty
- var age: Int = _
-}
-
-class AllTypesBean extends Serializable {
- @BeanProperty var stringField: String = _
- @BeanProperty var intField: java.lang.Integer = _
- @BeanProperty var longField: java.lang.Long = _
- @BeanProperty var floatField: java.lang.Float = _
- @BeanProperty var doubleField: java.lang.Double = _
- @BeanProperty var shortField: java.lang.Short = _
- @BeanProperty var byteField: java.lang.Byte = _
- @BeanProperty var booleanField: java.lang.Boolean = _
- @BeanProperty var dateField: java.sql.Date = _
- @BeanProperty var timestampField: java.sql.Timestamp = _
- @BeanProperty var bigDecimalField: java.math.BigDecimal = _
-}
-
-class JavaSQLSuite extends FunSuite {
- val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
- val javaSqlCtx = new JavaSQLContext(javaCtx)
-
- test("schema from JavaBeans") {
- val person = new PersonBean
- person.setName("Michael")
- person.setAge(29)
-
- val rdd = javaCtx.parallelize(person :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[PersonBean])
-
- schemaRDD.registerTempTable("people")
- javaSqlCtx.sql("SELECT * FROM people").collect()
- }
-
- test("schema with null from JavaBeans") {
- val person = new PersonBean
- person.setName("Michael")
- person.setAge(29)
-
- val rdd = javaCtx.parallelize(person :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[PersonBean])
-
- schemaRDD.registerTempTable("people")
- val nullRDD = javaSqlCtx.sql("SELECT null FROM people")
- val structFields = nullRDD.schema.fields
- assert(structFields.size == 1)
- assert(structFields(0).dataType === NullType)
- assert(nullRDD.collect().head.row === Seq(null))
- }
-
- test("all types in JavaBeans") {
- val bean = new AllTypesBean
- bean.setStringField("")
- bean.setIntField(0)
- bean.setLongField(0)
- bean.setFloatField(0.0F)
- bean.setDoubleField(0.0)
- bean.setShortField(0.toShort)
- bean.setByteField(0.toByte)
- bean.setBooleanField(false)
- bean.setDateField(java.sql.Date.valueOf("2014-10-10"))
- bean.setTimestampField(java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"))
- bean.setBigDecimalField(new java.math.BigDecimal(0))
-
- val rdd = javaCtx.parallelize(bean :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean])
- schemaRDD.registerTempTable("allTypes")
-
- assert(
- javaSqlCtx.sql(
- """
- |SELECT stringField, intField, longField, floatField, doubleField, shortField, byteField,
- | booleanField, dateField, timestampField, bigDecimalField
- |FROM allTypes
- """.stripMargin).collect.head.row ===
- Seq("", 0, 0L, 0F, 0.0, 0.toShort, 0.toByte, false, java.sql.Date.valueOf("2014-10-10"),
- java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"), scala.math.BigDecimal(0)))
- }
-
- test("decimal types in JavaBeans") {
- val bean = new AllTypesBean
- bean.setStringField("")
- bean.setIntField(0)
- bean.setLongField(0)
- bean.setFloatField(0.0F)
- bean.setDoubleField(0.0)
- bean.setShortField(0.toShort)
- bean.setByteField(0.toByte)
- bean.setBooleanField(false)
- bean.setDateField(java.sql.Date.valueOf("2014-10-10"))
- bean.setTimestampField(java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"))
- bean.setBigDecimalField(new java.math.BigDecimal(0))
-
- val rdd = javaCtx.parallelize(bean :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean])
- schemaRDD.registerTempTable("decimalTypes")
-
- assert(javaSqlCtx.sql(
- "select bigDecimalField + bigDecimalField from decimalTypes"
- ).collect.head.row === Seq(scala.math.BigDecimal(0)))
- }
-
- test("all types null in JavaBeans") {
- val bean = new AllTypesBean
- bean.setStringField(null)
- bean.setIntField(null)
- bean.setLongField(null)
- bean.setFloatField(null)
- bean.setDoubleField(null)
- bean.setShortField(null)
- bean.setByteField(null)
- bean.setBooleanField(null)
- bean.setDateField(null)
- bean.setTimestampField(null)
- bean.setBigDecimalField(null)
-
- val rdd = javaCtx.parallelize(bean :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean])
- schemaRDD.registerTempTable("allTypes")
-
- assert(
- javaSqlCtx.sql(
- """
- |SELECT stringField, intField, longField, floatField, doubleField, shortField, byteField,
- | booleanField, dateField, timestampField, bigDecimalField
- |FROM allTypes
- """.stripMargin).collect.head.row ===
- Seq.fill(11)(null))
- }
-
- test("loads JSON datasets") {
- val jsonString =
- """{"string":"this is a simple string.",
- "integer":10,
- "long":21474836470,
- "bigInteger":92233720368547758070,
- "double":1.7976931348623157E308,
- "boolean":true,
- "null":null
- }""".replaceAll("\n", " ")
- val rdd = javaCtx.parallelize(jsonString :: Nil)
-
- var schemaRDD = javaSqlCtx.jsonRDD(rdd)
-
- schemaRDD.registerTempTable("jsonTable1")
-
- assert(
- javaSqlCtx.sql("select * from jsonTable1").collect.head.row ===
- Seq(BigDecimal("92233720368547758070"),
- true,
- 1.7976931348623157E308,
- 10,
- 21474836470L,
- null,
- "this is a simple string."))
-
- val file = getTempFilePath("json")
- val path = file.toString
- rdd.saveAsTextFile(path)
- schemaRDD = javaSqlCtx.jsonFile(path)
-
- schemaRDD.registerTempTable("jsonTable2")
-
- assert(
- javaSqlCtx.sql("select * from jsonTable2").collect.head.row ===
- Seq(BigDecimal("92233720368547758070"),
- true,
- 1.7976931348623157E308,
- 10,
- 21474836470L,
- null,
- "this is a simple string."))
- }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
deleted file mode 100644
index 038f63f6c7..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.api.java
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD}
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.{HiveContext, HiveQl}
-
-/**
- * The entry point for executing Spark SQL queries from a Java program.
- */
-class JavaHiveContext(sqlContext: SQLContext) extends JavaSQLContext(sqlContext) {
-
- def this(sparkContext: JavaSparkContext) = this(new HiveContext(sparkContext))
-
- override def sql(sqlText: String): JavaSchemaRDD = {
- // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
- if (sqlContext.conf.dialect == "sql") {
- super.sql(sqlText)
- } else if (sqlContext.conf.dialect == "hiveql") {
- new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText))
- } else {
- sys.error(s"Unsupported SQL dialect: ${sqlContext.conf.dialect}. Try 'sql' or 'hiveql'")
- }
- }
-
- /**
- * DEPRECATED: Use sql(...) Instead
- */
- @Deprecated
- def hql(hqlQuery: String): JavaSchemaRDD =
- new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala
deleted file mode 100644
index ca78dfba4f..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.api.java
-
-import scala.util.Try
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD}
-import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.hive.test.TestHive
-
-// Implicits
-import scala.collection.JavaConversions._
-
-class JavaHiveQLSuite extends FunSuite {
- lazy val javaCtx = new JavaSparkContext(TestHive.sparkContext)
-
- // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
- lazy val javaHiveCtx = new JavaHiveContext(TestHive)
-
- test("SELECT * FROM src") {
- assert(
- javaHiveCtx.sql("SELECT * FROM src").collect().map(_.getInt(0)) ===
- TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
- }
-
- def isExplanation(result: JavaSchemaRDD) = {
- val explanation = result.collect().map(_.getString(0))
- explanation.size > 1 && explanation.head.startsWith("== Physical Plan ==")
- }
-
- test("Query Hive native command execution result") {
- val tableName = "test_native_commands"
-
- assertResult(0) {
- javaHiveCtx.sql(s"DROP TABLE IF EXISTS $tableName").count()
- }
-
- assertResult(0) {
- javaHiveCtx.sql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
- }
-
- assert(
- javaHiveCtx
- .sql("SHOW TABLES")
- .collect()
- .map(_.getString(0))
- .contains(tableName))
-
- assertResult(Array(Array("key", "int"), Array("value", "string"))) {
- javaHiveCtx
- .sql(s"describe $tableName")
- .collect()
- .map(row => Array(row.get(0).asInstanceOf[String], row.get(1).asInstanceOf[String]))
- .toArray
- }
-
- assert(isExplanation(javaHiveCtx.sql(
- s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
-
- TestHive.reset()
- }
-
- test("Exactly once semantics for DDL and command statements") {
- val tableName = "test_exactly_once"
- val q0 = javaHiveCtx.sql(s"CREATE TABLE $tableName(key INT, value STRING)")
-
- // If the table was not created, the following assertion would fail
- assert(Try(TestHive.table(tableName)).isSuccess)
-
- // If the CREATE TABLE command got executed again, the following assertion would fail
- assert(Try(q0.count()).isSuccess)
- }
-}