aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-16 21:09:06 -0800
committerReynold Xin <rxin@databricks.com>2015-01-16 21:09:06 -0800
commit61b427d4b1c4934bd70ed4da844b64f0e9a377aa (patch)
tree5068b31119fa7e2256422d4fdf18703ae64d7ab2 /sql
parentee1c1f3a04dfe80843432e349f01178e47f02443 (diff)
downloadspark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.tar.gz
spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.tar.bz2
spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.zip
[SPARK-5193][SQL] Remove Spark SQL Java-specific API.
After the following patches, the main (Scala) API is now usable for Java users directly. https://github.com/apache/spark/pull/4056 https://github.com/apache/spark/pull/4054 https://github.com/apache/spark/pull/4049 https://github.com/apache/spark/pull/4030 https://github.com/apache/spark/pull/3965 https://github.com/apache/spark/pull/3958 Author: Reynold Xin <rxin@databricks.com> Closes #4065 from rxin/sql-java-api and squashes the following commits: b1fd860 [Reynold Xin] Fix Mima 6d86578 [Reynold Xin] Ok one more attempt in fixing Python... e8f1455 [Reynold Xin] Fix Python again... 3e53f91 [Reynold Xin] Fixed Python. 83735da [Reynold Xin] Fix BigDecimal test. e9f1de3 [Reynold Xin] Use scala BigDecimal. 500d2c4 [Reynold Xin] Fix Decimal. ba3bfa2 [Reynold Xin] Updated javadoc for RowFactory. c4ae1c5 [Reynold Xin] [SPARK-5193][SQL] Remove Spark SQL Java-specific API.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala241
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala225
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala153
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala251
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java16
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java34
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java11
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java88
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala209
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala49
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala91
15 files changed, 46 insertions, 1350 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java b/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java
index 62fcec824d..5ed60fe78d 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java
@@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow;
public class RowFactory {
/**
- * Create a {@link Row} from an array of values. Position i in the array becomes position i
- * in the created {@link Row} object.
+ * Create a {@link Row} from the given arguments. Position i in the argument list becomes
+ * position i in the created {@link Row} object.
*/
- public static Row create(Object[] values) {
+ public static Row create(Object ... values) {
return new GenericRow(values);
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 3744d77c07..a85c4316e1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -143,6 +143,8 @@ final class Decimal extends Ordered[Decimal] with Serializable {
}
}
+ def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.bigDecimal
+
def toUnscaledLong: Long = {
if (decimalVal.ne(null)) {
decimalVal.underlying().unscaledValue().longValue()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 8ad1753dab..f23cb18c92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,7 +26,7 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
@@ -58,6 +58,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
self =>
+ def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
+
// Note that this is a lazy val so we can override the default value in subclasses.
protected[sql] lazy val conf: SQLConf = new SQLConf
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 686bcdfbb4..ae4d8ba90c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -30,7 +30,6 @@ import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
@@ -409,13 +408,6 @@ class SchemaRDD(
def toSchemaRDD = this
/**
- * Returns this RDD as a JavaSchemaRDD.
- *
- * @group schema
- */
- def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
-
- /**
* Converts a JavaRDD to a PythonRDD. It is used by pyspark.
*/
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
@@ -470,6 +462,8 @@ class SchemaRDD(
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
+ def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(collect() : _*)
+
override def take(num: Int): Array[Row] = limit(num).collect()
// =======================================================================
@@ -482,13 +476,15 @@ class SchemaRDD(
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.coalesce(numPartitions, shuffle)(ord))
- override def distinct(): SchemaRDD =
- applySchema(super.distinct())
+ override def distinct(): SchemaRDD = applySchema(super.distinct())
override def distinct(numPartitions: Int)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.distinct(numPartitions)(ord))
+ def distinct(numPartitions: Int): SchemaRDD =
+ applySchema(super.distinct(numPartitions)(null))
+
override def filter(f: Row => Boolean): SchemaRDD =
applySchema(super.filter(f))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
deleted file mode 100644
index a75f559928..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.api.java
-
-import java.beans.Introspector
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
-import org.apache.spark.sql.execution.LogicalRDD
-import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation}
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-/**
- * The entry point for executing Spark SQL queries from a Java program.
- */
-class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
-
- def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc))
-
- def baseRelationToSchemaRDD(baseRelation: BaseRelation): JavaSchemaRDD = {
- new JavaSchemaRDD(sqlContext, LogicalRelation(baseRelation))
- }
-
- /**
- * Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is
- * used for SQL parsing can be configured with 'spark.sql.dialect'.
- *
- * @group userf
- */
- def sql(sqlText: String): JavaSchemaRDD = {
- if (sqlContext.conf.dialect == "sql") {
- new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText))
- } else {
- sys.error(s"Unsupported SQL dialect: $sqlContext.dialect")
- }
- }
-
- /**
- * :: Experimental ::
- * Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
- * a table. This registered table can be used as the target of future `insertInto` operations.
- *
- * {{{
- * JavaSQLContext sqlCtx = new JavaSQLContext(...)
- *
- * sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerTempTable("people")
- * sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
- * }}}
- *
- * @param beanClass A java bean class object that will be used to determine the schema of the
- * parquet file.
- * @param path The path where the directory containing parquet metadata should be created.
- * Data inserted into this table will also be stored at this location.
- * @param allowExisting When false, an exception will be thrown if this directory already exists.
- * @param conf A Hadoop configuration object that can be used to specific options to the parquet
- * output format.
- */
- @Experimental
- def createParquetFile(
- beanClass: Class[_],
- path: String,
- allowExisting: Boolean = true,
- conf: Configuration = new Configuration()): JavaSchemaRDD = {
- new JavaSchemaRDD(
- sqlContext,
- ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext))
- }
-
- /**
- * Applies a schema to an RDD of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- */
- def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
- val attributeSeq = getSchema(beanClass)
- val className = beanClass.getName
- val rowRdd = rdd.rdd.mapPartitions { iter =>
- // BeanInfo is not serializable so we must rediscover it remotely for each partition.
- val localBeanInfo = Introspector.getBeanInfo(
- Class.forName(className, true, Utils.getContextOrSparkClassLoader))
- val extractors =
- localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
-
- iter.map { row =>
- new GenericRow(
- extractors.zip(attributeSeq).map { case (e, attr) =>
- DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
- }.toArray[Any]
- ): ScalaRow
- }
- }
- new JavaSchemaRDD(sqlContext, LogicalRDD(attributeSeq, rowRdd)(sqlContext))
- }
-
- /**
- * :: DeveloperApi ::
- * Creates a JavaSchemaRDD from an RDD containing Rows by applying a schema to this RDD.
- * It is important to make sure that the structure of every Row of the provided RDD matches the
- * provided schema. Otherwise, there will be runtime exception.
- */
- @DeveloperApi
- def applySchema(rowRDD: JavaRDD[Row], schema: StructType): JavaSchemaRDD = {
- val scalaRowRDD = rowRDD.rdd.map(r => r.row)
- val logicalPlan =
- LogicalRDD(schema.toAttributes, scalaRowRDD)(sqlContext)
- new JavaSchemaRDD(sqlContext, logicalPlan)
- }
-
- /**
- * Loads a parquet file from regular path or files that match file patterns in path,
- * returning the result as a [[JavaSchemaRDD]].
- * Supported glob file pattern information at ([[http://tinyurl.com/kcqrzn8]]).
- */
- def parquetFile(path: String): JavaSchemaRDD =
- new JavaSchemaRDD(
- sqlContext,
- ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext))
-
- /**
- * Loads a JSON file (one object per line), returning the result as a JavaSchemaRDD.
- * It goes through the entire dataset once to determine the schema.
- */
- def jsonFile(path: String): JavaSchemaRDD =
- jsonRDD(sqlContext.sparkContext.textFile(path))
-
- /**
- * :: Experimental ::
- * Loads a JSON file (one object per line) and applies the given schema,
- * returning the result as a JavaSchemaRDD.
- */
- @Experimental
- def jsonFile(path: String, schema: StructType): JavaSchemaRDD =
- jsonRDD(sqlContext.sparkContext.textFile(path), schema)
-
- /**
- * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
- * JavaSchemaRDD.
- * It goes through the entire dataset once to determine the schema.
- */
- def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
- val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
- val appliedScalaSchema =
- JsonRDD.nullTypeToStringType(
- JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
- val scalaRowRDD =
- JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
- val logicalPlan =
- LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
- new JavaSchemaRDD(sqlContext, logicalPlan)
- }
-
- /**
- * :: Experimental ::
- * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
- * returning the result as a JavaSchemaRDD.
- */
- @Experimental
- def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
- val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
- val appliedScalaSchema =
- Option(schema).getOrElse(
- JsonRDD.nullTypeToStringType(
- JsonRDD.inferSchema(
- json.rdd, 1.0, columnNameOfCorruptJsonRecord)))
- val scalaRowRDD = JsonRDD.jsonStringToRow(
- json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
- val logicalPlan =
- LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
- new JavaSchemaRDD(sqlContext, logicalPlan)
- }
-
- /**
- * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
- * during the lifetime of this instance of SQLContext.
- */
- def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
- sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
- }
-
- /**
- * Returns a Catalyst Schema for the given java bean class.
- */
- protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
- // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
- val beanInfo = Introspector.getBeanInfo(beanClass)
-
- // Note: The ordering of elements may differ from when the schema is inferred in Scala.
- // This is because beanInfo.getPropertyDescriptors gives no guarantees about
- // element ordering.
- val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
- fields.map { property =>
- val (dataType, nullable) = property.getPropertyType match {
- case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
- (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
- case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
- case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
- case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
- case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
- case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
- case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
- case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
- case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
-
- case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
- case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
- case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
- case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
- case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
- case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
- case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
- case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
- case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
- case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
- }
- AttributeReference(property.getName, dataType, nullable)()
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
deleted file mode 100644
index 9e10e532fb..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.api.java
-
-import java.util.{List => JList}
-
-import org.apache.spark.Partitioner
-import org.apache.spark.api.java.{JavaRDD, JavaRDDLike}
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.storage.StorageLevel
-
-/**
- * An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
- * standard RDD operations, a JavaSchemaRDD can also be registered as a table in the JavaSQLContext
- * that was used to create. Registering a JavaSchemaRDD allows its contents to be queried in
- * future SQL statement.
- *
- * @groupname schema SchemaRDD Functions
- * @groupprio schema -1
- * @groupname Ungrouped Base RDD Functions
- */
-class JavaSchemaRDD(
- @transient val sqlContext: SQLContext,
- @transient val baseLogicalPlan: LogicalPlan)
- extends JavaRDDLike[Row, JavaRDD[Row]]
- with SchemaRDDLike {
-
- private[sql] val baseSchemaRDD = new SchemaRDD(sqlContext, logicalPlan)
-
- /** Returns the underlying Scala SchemaRDD. */
- val schemaRDD: SchemaRDD = baseSchemaRDD
-
- override val classTag = scala.reflect.classTag[Row]
-
- override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
-
- val rdd = baseSchemaRDD.map(new Row(_))
-
- override def toString: String = baseSchemaRDD.toString
-
- /** Returns the schema of this JavaSchemaRDD (represented by a StructType). */
- def schema: StructType = baseSchemaRDD.schema.asInstanceOf[StructType]
-
- // =======================================================================
- // Base RDD functions that do NOT change schema
- // =======================================================================
-
- // Common RDD functions
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- def cache(): JavaSchemaRDD = {
- baseSchemaRDD.cache()
- this
- }
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- def persist(): JavaSchemaRDD = {
- baseSchemaRDD.persist()
- this
- }
-
- /**
- * Set this RDD's storage level to persist its values across operations after the first time
- * it is computed. This can only be used to assign a new storage level if the RDD does not
- * have a storage level set yet..
- */
- def persist(newLevel: StorageLevel): JavaSchemaRDD = {
- baseSchemaRDD.persist(newLevel)
- this
- }
-
- /**
- * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
- *
- * @param blocking Whether to block until all blocks are deleted.
- * @return This RDD.
- */
- def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
- baseSchemaRDD.unpersist(blocking)
- this
- }
-
- /** Assign a name to this RDD */
- def setName(name: String): JavaSchemaRDD = {
- baseSchemaRDD.setName(name)
- this
- }
-
- // Overridden actions from JavaRDDLike.
-
- override def collect(): JList[Row] = {
- import scala.collection.JavaConversions._
- val arr: java.util.Collection[Row] = baseSchemaRDD.collect().toSeq.map(new Row(_))
- new java.util.ArrayList(arr)
- }
-
- override def count(): Long = baseSchemaRDD.count
-
- override def take(num: Int): JList[Row] = {
- import scala.collection.JavaConversions._
- val arr: java.util.Collection[Row] = baseSchemaRDD.take(num).toSeq.map(new Row(_))
- new java.util.ArrayList(arr)
- }
-
- // Transformations (return a new RDD)
-
- /**
- * Returns a new RDD with each row transformed to a JSON string.
- */
- def toJSON(): JavaRDD[String] =
- baseSchemaRDD.toJSON.toJavaRDD
-
- /**
- * Return a new RDD that is reduced into `numPartitions` partitions.
- */
- def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
- baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD
-
- /**
- * Return a new RDD containing the distinct elements in this RDD.
- */
- def distinct(): JavaSchemaRDD =
- baseSchemaRDD.distinct().toJavaSchemaRDD
-
- /**
- * Return a new RDD containing the distinct elements in this RDD.
- */
- def distinct(numPartitions: Int): JavaSchemaRDD =
- baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD
-
- /**
- * Return a new RDD containing only the elements that satisfy a predicate.
- */
- def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
- baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD
-
- /**
- * Return the intersection of this RDD and another one. The output will not contain any
- * duplicate elements, even if the input RDDs did.
- *
- * Note that this method performs a shuffle internally.
- */
- def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
- this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD
-
- /**
- * Return the intersection of this RDD and another one. The output will not contain any
- * duplicate elements, even if the input RDDs did.
- *
- * Note that this method performs a shuffle internally.
- *
- * @param partitioner Partitioner to use for the resulting RDD
- */
- def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
- this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD
-
- /**
- * Return the intersection of this RDD and another one. The output will not contain any
- * duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster
- *
- * Note that this method performs a shuffle internally.
- *
- * @param numPartitions How many partitions to use in the resulting RDD
- */
- def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
- this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
-
- /**
- * Return a new RDD that has exactly `numPartitions` partitions.
- *
- * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
- * a shuffle to redistribute data.
- *
- * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
- * which can avoid performing a shuffle.
- */
- def repartition(numPartitions: Int): JavaSchemaRDD =
- baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- *
- * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
- * RDD will be &lt;= us.
- */
- def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
- this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- */
- def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
- this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- */
- def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
- this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD
-
- /**
- * Return a SchemaRDD with a sampled version of the underlying dataset.
- */
- def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaSchemaRDD =
- this.baseSchemaRDD.sample(withReplacement, fraction, seed).toJavaSchemaRDD
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
deleted file mode 100644
index 4faa79af25..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.api.java
-
-import scala.annotation.varargs
-import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
-import scala.collection.JavaConversions
-import scala.math.BigDecimal
-
-import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
-import org.apache.spark.sql.{Row => ScalaRow}
-
-/**
- * A result row from a Spark SQL query.
- */
-class Row(private[spark] val row: ScalaRow) extends Serializable {
-
- /** Returns the number of columns present in this Row. */
- def length: Int = row.length
-
- /** Returns the value of column `i`. */
- def get(i: Int): Any =
- Row.toJavaValue(row(i))
-
- /** Returns true if value at column `i` is NULL. */
- def isNullAt(i: Int) = get(i) == null
-
- /**
- * Returns the value of column `i` as an int. This function will throw an exception if the value
- * is at `i` is not an integer, or if it is null.
- */
- def getInt(i: Int): Int =
- row.getInt(i)
-
- /**
- * Returns the value of column `i` as a long. This function will throw an exception if the value
- * is at `i` is not a long, or if it is null.
- */
- def getLong(i: Int): Long =
- row.getLong(i)
-
- /**
- * Returns the value of column `i` as a double. This function will throw an exception if the
- * value is at `i` is not a double, or if it is null.
- */
- def getDouble(i: Int): Double =
- row.getDouble(i)
-
- /**
- * Returns the value of column `i` as a bool. This function will throw an exception if the value
- * is at `i` is not a boolean, or if it is null.
- */
- def getBoolean(i: Int): Boolean =
- row.getBoolean(i)
-
- /**
- * Returns the value of column `i` as a short. This function will throw an exception if the value
- * is at `i` is not a short, or if it is null.
- */
- def getShort(i: Int): Short =
- row.getShort(i)
-
- /**
- * Returns the value of column `i` as a byte. This function will throw an exception if the value
- * is at `i` is not a byte, or if it is null.
- */
- def getByte(i: Int): Byte =
- row.getByte(i)
-
- /**
- * Returns the value of column `i` as a float. This function will throw an exception if the value
- * is at `i` is not a float, or if it is null.
- */
- def getFloat(i: Int): Float =
- row.getFloat(i)
-
- /**
- * Returns the value of column `i` as a String. This function will throw an exception if the
- * value is at `i` is not a String.
- */
- def getString(i: Int): String =
- row.getString(i)
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[Row]
-
- override def equals(other: Any): Boolean = other match {
- case that: Row =>
- (that canEqual this) &&
- row == that.row
- case _ => false
- }
-
- override def hashCode(): Int = row.hashCode()
-
- override def toString: String = row.toString
-}
-
-object Row {
-
- private def toJavaValue(value: Any): Any = value match {
- // For values of this ScalaRow, we will do the conversion when
- // they are actually accessed.
- case row: ScalaRow => new Row(row)
- case map: scala.collection.Map[_, _] =>
- mapAsSerializableJavaMap(
- map.map {
- case (key, value) => (toJavaValue(key), toJavaValue(value))
- }
- )
- case seq: scala.collection.Seq[_] =>
- JavaConversions.seqAsJavaList(seq.map(toJavaValue))
- case decimal: BigDecimal => decimal.underlying()
- case other => other
- }
-
- // TODO: Consolidate the toScalaValue at here with the scalafy in JsonRDD?
- private def toScalaValue(value: Any): Any = value match {
- // Values of this row have been converted to Scala values.
- case row: Row => row.row
- case map: java.util.Map[_, _] =>
- JMapWrapper(map).map {
- case (key, value) => (toScalaValue(key), toScalaValue(value))
- }
- case list: java.util.List[_] =>
- JListWrapper(list).map(toScalaValue)
- case decimal: java.math.BigDecimal => BigDecimal(decimal)
- case other => other
- }
-
- /**
- * Creates a Row with the given values.
- */
- @varargs def create(values: Any*): Row = {
- // Right now, we cannot use @varargs to annotate the constructor of
- // org.apache.spark.sql.api.java.Row. See https://issues.scala-lang.org/browse/SI-8383.
- new Row(ScalaRow(values.map(toScalaValue):_*))
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala
deleted file mode 100644
index 4186c27451..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.api.java
-
-import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf}
-import org.apache.spark.sql.types.DataType
-
-/**
- * A collection of functions that allow Java users to register UDFs. In order to handle functions
- * of varying airities with minimal boilerplate for our users, we generate classes and functions
- * for each airity up to 22. The code for this generation can be found in comments in this trait.
- */
-private[java] trait UDFRegistration {
- self: JavaSQLContext =>
-
- /* The following functions and required interfaces are generated with these code fragments:
-
- (1 to 22).foreach { i =>
- val extTypeArgs = (1 to i).map(_ => "_").mkString(", ")
- val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ")
- val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]"
- val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
- println(s"""
- |def registerFunction(
- | name: String, f: UDF$i[$extTypeArgs, _], @transient dataType: DataType) = {
- | sqlContext.functionRegistry.registerFunction(
- | name,
- | (e: Seq[Expression]) => ScalaUdf(f$anyCast.call($anyParams), dataType, e))
- |}
- """.stripMargin)
- }
-
- import java.io.File
- import org.apache.spark.sql.catalyst.util.stringToFile
- val directory = new File("sql/core/src/main/java/org/apache/spark/sql/api/java/")
- (1 to 22).foreach { i =>
- val typeArgs = (1 to i).map(i => s"T$i").mkString(", ")
- val args = (1 to i).map(i => s"T$i t$i").mkString(", ")
-
- val contents =
- s"""/*
- | * Licensed to the Apache Software Foundation (ASF) under one or more
- | * contributor license agreements. See the NOTICE file distributed with
- | * this work for additional information regarding copyright ownership.
- | * The ASF licenses this file to You under the Apache License, Version 2.0
- | * (the "License"); you may not use this file except in compliance with
- | * the License. You may obtain a copy of the License at
- | *
- | * http://www.apache.org/licenses/LICENSE-2.0
- | *
- | * Unless required by applicable law or agreed to in writing, software
- | * distributed under the License is distributed on an "AS IS" BASIS,
- | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- | * See the License for the specific language governing permissions and
- | * limitations under the License.
- | */
- |
- |package org.apache.spark.sql.api.java;
- |
- |import java.io.Serializable;
- |
- |// **************************************************
- |// THIS FILE IS AUTOGENERATED BY CODE IN
- |// org.apache.spark.sql.api.java.FunctionRegistration
- |// **************************************************
- |
- |/**
- | * A Spark SQL UDF that has $i arguments.
- | */
- |public interface UDF$i<$typeArgs, R> extends Serializable {
- | public R call($args) throws Exception;
- |}
- |""".stripMargin
-
- stringToFile(new File(directory, s"UDF$i.java"), contents)
- }
-
- */
-
- // scalastyle:off
- def registerFunction(
- name: String, f: UDF1[_, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF2[_, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF3[_, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF4[_, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF5[_, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF6[_, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF7[_, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF8[_, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
-
- def registerFunction(
- name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
-
- def registerFunction(
- name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = {
- sqlContext.functionRegistry.registerFunction(
- name,
- (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e))
- }
- // scalastyle:on
-}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
index 88017eb47d..9ff40471a0 100644
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
@@ -24,6 +24,8 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -31,12 +33,12 @@ import org.apache.spark.sql.types.DataTypes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite implements Serializable {
private transient JavaSparkContext sc;
- private transient JavaSQLContext sqlContext;
+ private transient SQLContext sqlContext;
@Before
public void setUp() {
sc = new JavaSparkContext("local", "JavaAPISuite");
- sqlContext = new JavaSQLContext(sc);
+ sqlContext = new SQLContext(sc);
}
@After
@@ -52,15 +54,14 @@ public class JavaAPISuite implements Serializable {
// sqlContext.registerFunction(
// "stringLengthTest", (String str) -> str.length(), DataType.IntegerType);
- sqlContext.registerFunction("stringLengthTest", new UDF1<String, Integer>() {
+ sqlContext.udf().register("stringLengthTest", new UDF1<String, Integer>() {
@Override
public Integer call(String str) throws Exception {
return str.length();
}
}, DataTypes.IntegerType);
- // TODO: Why do we need this cast?
- Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test')").first();
+ Row result = sqlContext.sql("SELECT stringLengthTest('test')").first();
assert(result.getInt(0) == 4);
}
@@ -73,15 +74,14 @@ public class JavaAPISuite implements Serializable {
// (String str1, String str2) -> str1.length() + str2.length,
// DataType.IntegerType);
- sqlContext.registerFunction("stringLengthTest", new UDF2<String, String, Integer>() {
+ sqlContext.udf().register("stringLengthTest", new UDF2<String, String, Integer>() {
@Override
public Integer call(String str1, String str2) throws Exception {
return str1.length() + str2.length();
}
}, DataTypes.IntegerType);
- // TODO: Why do we need this cast?
- Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test', 'test2')").first();
+ Row result = sqlContext.sql("SELECT stringLengthTest('test', 'test2')").first();
assert(result.getInt(0) == 9);
}
}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
index de586ba635..86d21f49fe 100644
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
@@ -18,7 +18,6 @@
package org.apache.spark.sql.api.java;
import java.io.Serializable;
-import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -31,6 +30,7 @@ import org.junit.Test;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -38,12 +38,12 @@ import org.apache.spark.sql.types.*;
// see http://stackoverflow.com/questions/758570/.
public class JavaApplySchemaSuite implements Serializable {
private transient JavaSparkContext javaCtx;
- private transient JavaSQLContext javaSqlCtx;
+ private transient SQLContext javaSqlCtx;
@Before
public void setUp() {
javaCtx = new JavaSparkContext("local", "JavaApplySchemaSuite");
- javaSqlCtx = new JavaSQLContext(javaCtx);
+ javaSqlCtx = new SQLContext(javaCtx);
}
@After
@@ -89,7 +89,7 @@ public class JavaApplySchemaSuite implements Serializable {
JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map(
new Function<Person, Row>() {
public Row call(Person person) throws Exception {
- return Row.create(person.getName(), person.getAge());
+ return RowFactory.create(person.getName(), person.getAge());
}
});
@@ -98,15 +98,15 @@ public class JavaApplySchemaSuite implements Serializable {
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType schema = DataTypes.createStructType(fields);
- JavaSchemaRDD schemaRDD = javaSqlCtx.applySchema(rowRDD, schema);
+ SchemaRDD schemaRDD = javaSqlCtx.applySchema(rowRDD.rdd(), schema);
schemaRDD.registerTempTable("people");
- List<Row> actual = javaSqlCtx.sql("SELECT * FROM people").collect();
+ Row[] actual = javaSqlCtx.sql("SELECT * FROM people").collect();
List<Row> expected = new ArrayList<Row>(2);
- expected.add(Row.create("Michael", 29));
- expected.add(Row.create("Yin", 28));
+ expected.add(RowFactory.create("Michael", 29));
+ expected.add(RowFactory.create("Yin", 28));
- Assert.assertEquals(expected, actual);
+ Assert.assertEquals(expected, Arrays.asList(actual));
}
@Test
@@ -129,8 +129,8 @@ public class JavaApplySchemaSuite implements Serializable {
StructType expectedSchema = DataTypes.createStructType(fields);
List<Row> expectedResult = new ArrayList<Row>(2);
expectedResult.add(
- Row.create(
- new BigDecimal("92233720368547758070"),
+ RowFactory.create(
+ scala.math.BigDecimal$.MODULE$.apply("92233720368547758070"),
true,
1.7976931348623157E308,
10,
@@ -138,8 +138,8 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is a simple string."));
expectedResult.add(
- Row.create(
- new BigDecimal("92233720368547758069"),
+ RowFactory.create(
+ scala.math.BigDecimal$.MODULE$.apply("92233720368547758069"),
false,
1.7976931348623157E305,
11,
@@ -147,18 +147,18 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is another simple string."));
- JavaSchemaRDD schemaRDD1 = javaSqlCtx.jsonRDD(jsonRDD);
+ SchemaRDD schemaRDD1 = javaSqlCtx.jsonRDD(jsonRDD.rdd());
StructType actualSchema1 = schemaRDD1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
schemaRDD1.registerTempTable("jsonTable1");
- List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collect();
+ List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);
- JavaSchemaRDD schemaRDD2 = javaSqlCtx.jsonRDD(jsonRDD, expectedSchema);
+ SchemaRDD schemaRDD2 = javaSqlCtx.jsonRDD(jsonRDD.rdd(), expectedSchema);
StructType actualSchema2 = schemaRDD2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
schemaRDD2.registerTempTable("jsonTable2");
- List<Row> actual2 = javaSqlCtx.sql("select * from jsonTable2").collect();
+ List<Row> actual2 = javaSqlCtx.sql("select * from jsonTable2").collectAsList();
Assert.assertEquals(expectedResult, actual2);
}
}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java
index 2b5812159d..fbfcd3f59d 100644
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java
@@ -29,6 +29,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+
public class JavaRowSuite {
private byte byteValue;
private short shortValue;
@@ -61,7 +64,7 @@ public class JavaRowSuite {
@Test
public void constructSimpleRow() {
- Row simpleRow = Row.create(
+ Row simpleRow = RowFactory.create(
byteValue, // ByteType
new Byte(byteValue),
shortValue, // ShortType
@@ -137,7 +140,7 @@ public class JavaRowSuite {
simpleMap.put(stringValue + " (3)", longValue - 2);
// Simple struct
- Row simpleStruct = Row.create(
+ Row simpleStruct = RowFactory.create(
doubleValue, stringValue, timestampValue, null);
// Complex array
@@ -150,7 +153,7 @@ public class JavaRowSuite {
complexMap.put(arrayOfRows, simpleStruct);
// Complex struct
- Row complexStruct = Row.create(
+ Row complexStruct = RowFactory.create(
simpleStringArray,
simpleMap,
simpleStruct,
@@ -167,7 +170,7 @@ public class JavaRowSuite {
Assert.assertEquals(null, complexStruct.get(6));
// A very complex row
- Row complexRow = Row.create(arrayOfMaps, arrayOfRows, complexMap, complexStruct);
+ Row complexRow = RowFactory.create(arrayOfMaps, arrayOfRows, complexMap, complexStruct);
Assert.assertEquals(arrayOfMaps, complexRow.get(0));
Assert.assertEquals(arrayOfRows, complexRow.get(1));
Assert.assertEquals(complexMap, complexRow.get(2));
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java
deleted file mode 100644
index 0caa8219a6..0000000000
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.api.java;
-
-import java.io.Serializable;
-import java.util.*;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.MyDenseVector;
-import org.apache.spark.sql.MyLabeledPoint;
-
-public class JavaUserDefinedTypeSuite implements Serializable {
- private transient JavaSparkContext javaCtx;
- private transient JavaSQLContext javaSqlCtx;
-
- @Before
- public void setUp() {
- javaCtx = new JavaSparkContext("local", "JavaUserDefinedTypeSuite");
- javaSqlCtx = new JavaSQLContext(javaCtx);
- }
-
- @After
- public void tearDown() {
- javaCtx.stop();
- javaCtx = null;
- javaSqlCtx = null;
- }
-
- @Test
- public void useScalaUDT() {
- List<MyLabeledPoint> points = Arrays.asList(
- new MyLabeledPoint(1.0, new MyDenseVector(new double[]{0.1, 1.0})),
- new MyLabeledPoint(0.0, new MyDenseVector(new double[]{0.2, 2.0})));
- JavaRDD<MyLabeledPoint> pointsRDD = javaCtx.parallelize(points);
-
- JavaSchemaRDD schemaRDD = javaSqlCtx.applySchema(pointsRDD, MyLabeledPoint.class);
- schemaRDD.registerTempTable("points");
-
- List<Row> actualLabelRows = javaSqlCtx.sql("SELECT label FROM points").collect();
- List<Double> actualLabels = new LinkedList<Double>();
- for (Row r : actualLabelRows) {
- actualLabels.add(r.getDouble(0));
- }
- for (MyLabeledPoint lp : points) {
- Assert.assertTrue(actualLabels.contains(lp.label()));
- }
-
- List<Row> actualFeatureRows = javaSqlCtx.sql("SELECT features FROM points").collect();
- List<MyDenseVector> actualFeatures = new LinkedList<MyDenseVector>();
- for (Row r : actualFeatureRows) {
- actualFeatures.add((MyDenseVector)r.get(0));
- }
- for (MyLabeledPoint lp : points) {
- Assert.assertTrue(actualFeatures.contains(lp.features()));
- }
-
- List<Row> actual = javaSqlCtx.sql("SELECT label, features FROM points").collect();
- List<MyLabeledPoint> actualPoints =
- new LinkedList<MyLabeledPoint>();
- for (Row r : actual) {
- actualPoints.add(new MyLabeledPoint(r.getDouble(0), (MyDenseVector)r.get(1)));
- }
- for (MyLabeledPoint lp : points) {
- Assert.assertTrue(actualPoints.contains(lp));
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
deleted file mode 100644
index fdbb4282ba..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.api.java
-
-import scala.beans.BeanProperty
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.types.NullType
-
-// Implicits
-import scala.collection.JavaConversions._
-
-class PersonBean extends Serializable {
- @BeanProperty
- var name: String = _
-
- @BeanProperty
- var age: Int = _
-}
-
-class AllTypesBean extends Serializable {
- @BeanProperty var stringField: String = _
- @BeanProperty var intField: java.lang.Integer = _
- @BeanProperty var longField: java.lang.Long = _
- @BeanProperty var floatField: java.lang.Float = _
- @BeanProperty var doubleField: java.lang.Double = _
- @BeanProperty var shortField: java.lang.Short = _
- @BeanProperty var byteField: java.lang.Byte = _
- @BeanProperty var booleanField: java.lang.Boolean = _
- @BeanProperty var dateField: java.sql.Date = _
- @BeanProperty var timestampField: java.sql.Timestamp = _
- @BeanProperty var bigDecimalField: java.math.BigDecimal = _
-}
-
-class JavaSQLSuite extends FunSuite {
- val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
- val javaSqlCtx = new JavaSQLContext(javaCtx)
-
- test("schema from JavaBeans") {
- val person = new PersonBean
- person.setName("Michael")
- person.setAge(29)
-
- val rdd = javaCtx.parallelize(person :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[PersonBean])
-
- schemaRDD.registerTempTable("people")
- javaSqlCtx.sql("SELECT * FROM people").collect()
- }
-
- test("schema with null from JavaBeans") {
- val person = new PersonBean
- person.setName("Michael")
- person.setAge(29)
-
- val rdd = javaCtx.parallelize(person :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[PersonBean])
-
- schemaRDD.registerTempTable("people")
- val nullRDD = javaSqlCtx.sql("SELECT null FROM people")
- val structFields = nullRDD.schema.fields
- assert(structFields.size == 1)
- assert(structFields(0).dataType === NullType)
- assert(nullRDD.collect().head.row === Seq(null))
- }
-
- test("all types in JavaBeans") {
- val bean = new AllTypesBean
- bean.setStringField("")
- bean.setIntField(0)
- bean.setLongField(0)
- bean.setFloatField(0.0F)
- bean.setDoubleField(0.0)
- bean.setShortField(0.toShort)
- bean.setByteField(0.toByte)
- bean.setBooleanField(false)
- bean.setDateField(java.sql.Date.valueOf("2014-10-10"))
- bean.setTimestampField(java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"))
- bean.setBigDecimalField(new java.math.BigDecimal(0))
-
- val rdd = javaCtx.parallelize(bean :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean])
- schemaRDD.registerTempTable("allTypes")
-
- assert(
- javaSqlCtx.sql(
- """
- |SELECT stringField, intField, longField, floatField, doubleField, shortField, byteField,
- | booleanField, dateField, timestampField, bigDecimalField
- |FROM allTypes
- """.stripMargin).collect.head.row ===
- Seq("", 0, 0L, 0F, 0.0, 0.toShort, 0.toByte, false, java.sql.Date.valueOf("2014-10-10"),
- java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"), scala.math.BigDecimal(0)))
- }
-
- test("decimal types in JavaBeans") {
- val bean = new AllTypesBean
- bean.setStringField("")
- bean.setIntField(0)
- bean.setLongField(0)
- bean.setFloatField(0.0F)
- bean.setDoubleField(0.0)
- bean.setShortField(0.toShort)
- bean.setByteField(0.toByte)
- bean.setBooleanField(false)
- bean.setDateField(java.sql.Date.valueOf("2014-10-10"))
- bean.setTimestampField(java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"))
- bean.setBigDecimalField(new java.math.BigDecimal(0))
-
- val rdd = javaCtx.parallelize(bean :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean])
- schemaRDD.registerTempTable("decimalTypes")
-
- assert(javaSqlCtx.sql(
- "select bigDecimalField + bigDecimalField from decimalTypes"
- ).collect.head.row === Seq(scala.math.BigDecimal(0)))
- }
-
- test("all types null in JavaBeans") {
- val bean = new AllTypesBean
- bean.setStringField(null)
- bean.setIntField(null)
- bean.setLongField(null)
- bean.setFloatField(null)
- bean.setDoubleField(null)
- bean.setShortField(null)
- bean.setByteField(null)
- bean.setBooleanField(null)
- bean.setDateField(null)
- bean.setTimestampField(null)
- bean.setBigDecimalField(null)
-
- val rdd = javaCtx.parallelize(bean :: Nil)
- val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean])
- schemaRDD.registerTempTable("allTypes")
-
- assert(
- javaSqlCtx.sql(
- """
- |SELECT stringField, intField, longField, floatField, doubleField, shortField, byteField,
- | booleanField, dateField, timestampField, bigDecimalField
- |FROM allTypes
- """.stripMargin).collect.head.row ===
- Seq.fill(11)(null))
- }
-
- test("loads JSON datasets") {
- val jsonString =
- """{"string":"this is a simple string.",
- "integer":10,
- "long":21474836470,
- "bigInteger":92233720368547758070,
- "double":1.7976931348623157E308,
- "boolean":true,
- "null":null
- }""".replaceAll("\n", " ")
- val rdd = javaCtx.parallelize(jsonString :: Nil)
-
- var schemaRDD = javaSqlCtx.jsonRDD(rdd)
-
- schemaRDD.registerTempTable("jsonTable1")
-
- assert(
- javaSqlCtx.sql("select * from jsonTable1").collect.head.row ===
- Seq(BigDecimal("92233720368547758070"),
- true,
- 1.7976931348623157E308,
- 10,
- 21474836470L,
- null,
- "this is a simple string."))
-
- val file = getTempFilePath("json")
- val path = file.toString
- rdd.saveAsTextFile(path)
- schemaRDD = javaSqlCtx.jsonFile(path)
-
- schemaRDD.registerTempTable("jsonTable2")
-
- assert(
- javaSqlCtx.sql("select * from jsonTable2").collect.head.row ===
- Seq(BigDecimal("92233720368547758070"),
- true,
- 1.7976931348623157E308,
- 10,
- 21474836470L,
- null,
- "this is a simple string."))
- }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
deleted file mode 100644
index 038f63f6c7..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.api.java
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD}
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.{HiveContext, HiveQl}
-
-/**
- * The entry point for executing Spark SQL queries from a Java program.
- */
-class JavaHiveContext(sqlContext: SQLContext) extends JavaSQLContext(sqlContext) {
-
- def this(sparkContext: JavaSparkContext) = this(new HiveContext(sparkContext))
-
- override def sql(sqlText: String): JavaSchemaRDD = {
- // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
- if (sqlContext.conf.dialect == "sql") {
- super.sql(sqlText)
- } else if (sqlContext.conf.dialect == "hiveql") {
- new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText))
- } else {
- sys.error(s"Unsupported SQL dialect: ${sqlContext.conf.dialect}. Try 'sql' or 'hiveql'")
- }
- }
-
- /**
- * DEPRECATED: Use sql(...) Instead
- */
- @Deprecated
- def hql(hqlQuery: String): JavaSchemaRDD =
- new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala
deleted file mode 100644
index ca78dfba4f..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.api.java
-
-import scala.util.Try
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD}
-import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.hive.test.TestHive
-
-// Implicits
-import scala.collection.JavaConversions._
-
-class JavaHiveQLSuite extends FunSuite {
- lazy val javaCtx = new JavaSparkContext(TestHive.sparkContext)
-
- // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
- lazy val javaHiveCtx = new JavaHiveContext(TestHive)
-
- test("SELECT * FROM src") {
- assert(
- javaHiveCtx.sql("SELECT * FROM src").collect().map(_.getInt(0)) ===
- TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
- }
-
- def isExplanation(result: JavaSchemaRDD) = {
- val explanation = result.collect().map(_.getString(0))
- explanation.size > 1 && explanation.head.startsWith("== Physical Plan ==")
- }
-
- test("Query Hive native command execution result") {
- val tableName = "test_native_commands"
-
- assertResult(0) {
- javaHiveCtx.sql(s"DROP TABLE IF EXISTS $tableName").count()
- }
-
- assertResult(0) {
- javaHiveCtx.sql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
- }
-
- assert(
- javaHiveCtx
- .sql("SHOW TABLES")
- .collect()
- .map(_.getString(0))
- .contains(tableName))
-
- assertResult(Array(Array("key", "int"), Array("value", "string"))) {
- javaHiveCtx
- .sql(s"describe $tableName")
- .collect()
- .map(row => Array(row.get(0).asInstanceOf[String], row.get(1).asInstanceOf[String]))
- .toArray
- }
-
- assert(isExplanation(javaHiveCtx.sql(
- s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
-
- TestHive.reset()
- }
-
- test("Exactly once semantics for DDL and command statements") {
- val tableName = "test_exactly_once"
- val q0 = javaHiveCtx.sql(s"CREATE TABLE $tableName(key INT, value STRING)")
-
- // If the table was not created, the following assertion would fail
- assert(Try(TestHive.table(tableName)).isSuccess)
-
- // If the CREATE TABLE command got executed again, the following assertion would fail
- assert(Try(q0.count()).isSuccess)
- }
-}