aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-14 18:36:15 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-14 18:36:15 -0800
commitcfa397c126c857bfc9843d9e598a14b7c1e0457f (patch)
tree26c014e2733b78214e7030e095521f4463b18881 /sql
parent13d2406781714daea2bbf3bfb7fec0dead10760c (diff)
downloadspark-cfa397c126c857bfc9843d9e598a14b7c1e0457f.tar.gz
spark-cfa397c126c857bfc9843d9e598a14b7c1e0457f.tar.bz2
spark-cfa397c126c857bfc9843d9e598a14b7c1e0457f.zip
[SPARK-5193][SQL] Tighten up SQLContext API
1. Removed 2 implicits (logicalPlanToSparkQuery and baseRelationToSchemaRDD) 2. Moved extraStrategies into ExperimentalMethods. 3. Made private methods protected[sql] so they don't show up in javadocs. 4. Removed createParquetFile. 5. Added Java version of applySchema to SQLContext. Author: Reynold Xin <rxin@databricks.com> Closes #4049 from rxin/sqlContext-refactor and squashes the following commits: a326a1a [Reynold Xin] Remove createParquetFile and add applySchema for Java to SQLContext. ecd6685 [Reynold Xin] Added baseRelationToSchemaRDD back. 4a38c9b [Reynold Xin] [SPARK-5193][SQL] Tighten up SQLContext API
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala152
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala160
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala2
10 files changed, 152 insertions, 281 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
new file mode 100644
index 0000000000..f0e6a8f332
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Holder for experimental methods for the bravest. We make NO guarantee about the stability
+ * regarding binary compatibility and source compatibility of methods here.
+ */
+@Experimental
+class ExperimentalMethods protected[sql](sqlContext: SQLContext) {
+
+ /**
+ * Allows extra strategies to be injected into the query planner at runtime. Note this API
+ * should be consider experimental and is not intended to be stable across releases.
+ */
+ @Experimental
+ var extraStrategies: Seq[Strategy] = Nil
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index d9f3b3a53f..279671ced0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -17,15 +17,16 @@
package org.apache.spark.sql
+import java.beans.Introspector
import java.util.Properties
import scala.collection.immutable
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
-import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
+import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
@@ -36,9 +37,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.json._
-import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation}
+import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy}
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
* :: AlphaComponent ::
@@ -59,7 +60,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
self =>
// Note that this is a lazy val so we can override the default value in subclasses.
- private[sql] lazy val conf: SQLConf = new SQLConf
+ protected[sql] lazy val conf: SQLConf = new SQLConf
/** Set Spark SQL configuration properties. */
def setConf(props: Properties): Unit = conf.setConf(props)
@@ -118,15 +119,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
- * :: DeveloperApi ::
- * Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
- * interface is considered internal, and thus not guaranteed to be stable. As a result, using
- * them directly is not recommended.
- */
- @DeveloperApi
- implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)
-
- /**
* Creates a SchemaRDD from an RDD of case classes.
*
* @group userf
@@ -139,8 +131,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, LogicalRDD(attributeSeq, rowRDD)(self))
}
- implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
- logicalPlanToSparkQuery(LogicalRelation(baseRelation))
+ /**
+ * Convert a [[BaseRelation]] created for external data sources into a [[SchemaRDD]].
+ */
+ def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
+ new SchemaRDD(this, LogicalRelation(baseRelation))
}
/**
@@ -182,6 +177,43 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
+ * Applies a schema to an RDD of Java Beans.
+ *
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
+ * SELECT * queries will return the columns in an undefined order.
+ */
+ def applySchema(rdd: RDD[_], beanClass: Class[_]): SchemaRDD = {
+ val attributeSeq = getSchema(beanClass)
+ val className = beanClass.getName
+ val rowRdd = rdd.mapPartitions { iter =>
+ // BeanInfo is not serializable so we must rediscover it remotely for each partition.
+ val localBeanInfo = Introspector.getBeanInfo(
+ Class.forName(className, true, Utils.getContextOrSparkClassLoader))
+ val extractors =
+ localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
+
+ iter.map { row =>
+ new GenericRow(
+ extractors.zip(attributeSeq).map { case (e, attr) =>
+ DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
+ }.toArray[Any]
+ ) : Row
+ }
+ }
+ new SchemaRDD(this, LogicalRDD(attributeSeq, rowRdd)(this))
+ }
+
+ /**
+ * Applies a schema to an RDD of Java Beans.
+ *
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
+ * SELECT * queries will return the columns in an undefined order.
+ */
+ def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): SchemaRDD = {
+ applySchema(rdd.rdd, beanClass)
+ }
+
+ /**
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
*
* @group userf
@@ -260,41 +292,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
- * :: Experimental ::
- * Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
- * This registered table can be used as the target of future `insertInto` operations.
- *
- * {{{
- * val sqlContext = new SQLContext(...)
- * import sqlContext._
- *
- * case class Person(name: String, age: Int)
- * createParquetFile[Person]("path/to/file.parquet").registerTempTable("people")
- * sql("INSERT INTO people SELECT 'michael', 29")
- * }}}
- *
- * @tparam A A case class type that describes the desired schema of the parquet file to be
- * created.
- * @param path The path where the directory containing parquet metadata should be created.
- * Data inserted into this table will also be stored at this location.
- * @param allowExisting When false, an exception will be thrown if this directory already exists.
- * @param conf A Hadoop configuration object that can be used to specify options to the parquet
- * output format.
- *
- * @group userf
- */
- @Experimental
- def createParquetFile[A <: Product : TypeTag](
- path: String,
- allowExisting: Boolean = true,
- conf: Configuration = new Configuration()): SchemaRDD = {
- new SchemaRDD(
- this,
- ParquetRelation.createEmpty(
- path, ScalaReflection.attributesFor[A], allowExisting, conf, this))
- }
-
- /**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
*
@@ -336,12 +333,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))
/**
- * :: DeveloperApi ::
- * Allows extra strategies to be injected into the query planner at runtime. Note this API
- * should be consider experimental and is not intended to be stable across releases.
+ * A collection of methods that are considered experimental, but can be used to hook into
+ * the query planner for advanced functionalities.
*/
- @DeveloperApi
- var extraStrategies: Seq[Strategy] = Nil
+ val experimental: ExperimentalMethods = new ExperimentalMethods(this)
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext
@@ -353,7 +348,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def numPartitions = self.conf.numShufflePartitions
def strategies: Seq[Strategy] =
- extraStrategies ++ (
+ experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
@@ -479,14 +474,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
* have the same format as the one generated by `toString` in scala.
* It is only used by PySpark.
*/
- private[sql] def parseDataType(dataTypeString: String): DataType = {
+ protected[sql] def parseDataType(dataTypeString: String): DataType = {
DataType.fromJson(dataTypeString)
}
/**
* Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
*/
- private[sql] def applySchemaToPythonRDD(
+ protected[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schemaString: String): SchemaRDD = {
val schema = parseDataType(schemaString).asInstanceOf[StructType]
@@ -496,7 +491,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Apply a schema defined by the schema to an RDD. It is only used by PySpark.
*/
- private[sql] def applySchemaToPythonRDD(
+ protected[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schema: StructType): SchemaRDD = {
@@ -527,4 +522,43 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}
+
+ /**
+ * Returns a Catalyst Schema for the given java bean class.
+ */
+ protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
+ // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
+ val beanInfo = Introspector.getBeanInfo(beanClass)
+
+ // Note: The ordering of elements may differ from when the schema is inferred in Scala.
+ // This is because beanInfo.getPropertyDescriptors gives no guarantees about
+ // element ordering.
+ val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
+ fields.map { property =>
+ val (dataType, nullable) = property.getPropertyType match {
+ case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
+ (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
+ case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
+ case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
+ case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
+ case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
+ case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
+ case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
+ case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
+ case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
+
+ case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
+ case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
+ case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
+ case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
+ case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
+ case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
+ case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
+ case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
+ case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
+ case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
+ }
+ AttributeReference(property.getName, dataType, nullable)()
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index af6b07bd6c..52a31f01a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.execution
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.{SQLConf, SQLContext}
/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
@@ -137,14 +137,12 @@ case class CacheTableCommand(
isLazy: Boolean) extends RunnableCommand {
override def run(sqlContext: SQLContext) = {
- import sqlContext._
-
- plan.foreach(_.registerTempTable(tableName))
- cacheTable(tableName)
+ plan.foreach(p => new SchemaRDD(sqlContext, p).registerTempTable(tableName))
+ sqlContext.cacheTable(tableName)
if (!isLazy) {
// Performs eager caching
- table(tableName).count()
+ sqlContext.table(tableName).count()
}
Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 4cc9641c4d..381298caba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -22,7 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
import scala.util.parsing.combinator.PackratParsers
import org.apache.spark.Logging
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{SchemaRDD, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.SqlLexical
import org.apache.spark.sql.execution.RunnableCommand
@@ -234,8 +234,7 @@ private [sql] case class CreateTempTableUsing(
def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
-
- sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
+ new SchemaRDD(sqlContext, LogicalRelation(resolved.relation)).registerTempTable(tableName)
Seq.empty
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 8c80be106f..f9c0822160 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -17,8 +17,11 @@
package org.apache.spark.sql.test
+import scala.language.implicitConversions
+
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{SQLConf, SQLContext}
+import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
/** A SQLContext that can be used for local testing. */
object TestSQLContext
@@ -29,7 +32,16 @@ object TestSQLContext
new SparkConf().set("spark.sql.testkey", "true"))) {
/** Fewer partitions to speed up testing. */
- private[sql] override lazy val conf: SQLConf = new SQLConf {
+ protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
}
+
+ /**
+ * Turn a logical plan into a SchemaRDD. This should be removed once we have an easier way to
+ * construct SchemaRDD directly out of local data without relying on implicits.
+ */
+ protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = {
+ new SchemaRDD(this, plan)
+ }
+
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
deleted file mode 100644
index c87d762751..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import _root_.java.io.File
-
-/* Implicits */
-import org.apache.spark.sql.test.TestSQLContext._
-
-class InsertIntoSuite extends QueryTest {
- TestData // Initialize TestData
- import TestData._
-
- test("insertInto() created parquet file") {
- val testFilePath = File.createTempFile("sparkSql", "pqt")
- testFilePath.delete()
- testFilePath.deleteOnExit()
- val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
- testFile.registerTempTable("createAndInsertTest")
-
- // Add some data.
- testData.insertInto("createAndInsertTest")
-
- // Make sure its there for a new instance of parquet file.
- checkAnswer(
- parquetFile(testFilePath.getCanonicalPath),
- testData.collect().toSeq
- )
-
- // Make sure the registered table has also been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertTest"),
- testData.collect().toSeq
- )
-
- // Add more data.
- testData.insertInto("createAndInsertTest")
-
- // Make sure all data is there for a new instance of parquet file.
- checkAnswer(
- parquetFile(testFilePath.getCanonicalPath),
- testData.collect().toSeq ++ testData.collect().toSeq
- )
-
- // Make sure the registered table has also been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertTest"),
- testData.collect().toSeq ++ testData.collect().toSeq
- )
-
- // Now overwrite.
- testData.insertInto("createAndInsertTest", overwrite = true)
-
- // Make sure its there for a new instance of parquet file.
- checkAnswer(
- parquetFile(testFilePath.getCanonicalPath),
- testData.collect().toSeq
- )
-
- // Make sure the registered table has also been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertTest"),
- testData.collect().toSeq
- )
-
- testFilePath.delete()
- }
-
- test("INSERT INTO parquet table") {
- val testFilePath = File.createTempFile("sparkSql", "pqt")
- testFilePath.delete()
- testFilePath.deleteOnExit()
- val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
- testFile.registerTempTable("createAndInsertSQLTest")
-
- sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
-
- // Make sure its there for a new instance of parquet file.
- checkAnswer(
- parquetFile(testFilePath.getCanonicalPath),
- testData.collect().toSeq
- )
-
- // Make sure the registered table has also been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertSQLTest"),
- testData.collect().toSeq
- )
-
- // Append more data.
- sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
-
- // Make sure all data is there for a new instance of parquet file.
- checkAnswer(
- parquetFile(testFilePath.getCanonicalPath),
- testData.collect().toSeq ++ testData.collect().toSeq
- )
-
- // Make sure the registered table has also been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertSQLTest"),
- testData.collect().toSeq ++ testData.collect().toSeq
- )
-
- sql("INSERT OVERWRITE INTO createAndInsertSQLTest SELECT * FROM testData")
-
- // Make sure its there for a new instance of parquet file.
- checkAnswer(
- parquetFile(testFilePath.getCanonicalPath),
- testData.collect().toSeq
- )
-
- // Make sure the registered table has also been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertSQLTest"),
- testData.collect().toSeq
- )
-
- testFilePath.delete()
- }
-
- test("Double create fails when allowExisting = false") {
- val testFilePath = File.createTempFile("sparkSql", "pqt")
- testFilePath.delete()
- testFilePath.deleteOnExit()
- val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
-
- intercept[RuntimeException] {
- createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = false)
- }
-
- testFilePath.delete()
- }
-
- test("Double create does not fail when allowExisting = true") {
- val testFilePath = File.createTempFile("sparkSql", "pqt")
- testFilePath.delete()
- testFilePath.deleteOnExit()
- val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
-
- createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = true)
-
- testFilePath.delete()
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index fe781ec05f..3a073a6b70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -402,23 +402,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
Utils.deleteRecursively(file)
}
- test("Insert (overwrite) via Scala API") {
- val dirname = Utils.createTempDir()
- val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
- .map(i => TestRDDEntry(i, s"val_$i"))
- source_rdd.registerTempTable("source")
- val dest_rdd = createParquetFile[TestRDDEntry](dirname.toString)
- dest_rdd.registerTempTable("dest")
- sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
- val rdd_copy1 = sql("SELECT * FROM dest").collect()
- assert(rdd_copy1.size === 100)
-
- sql("INSERT INTO dest SELECT * FROM source")
- val rdd_copy2 = sql("SELECT * FROM dest").collect().sortBy(_.getInt(0))
- assert(rdd_copy2.size === 200)
- Utils.deleteRecursively(dirname)
- }
-
test("Insert (appending) to same table via Scala API") {
sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect()
@@ -902,15 +885,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
Utils.deleteRecursively(tmpdir)
}
- test("Querying on empty parquet throws exception (SPARK-3536)") {
- val tmpdir = Utils.createTempDir()
- Utils.deleteRecursively(tmpdir)
- createParquetFile[TestRDDEntry](tmpdir.toString()).registerTempTable("tmpemptytable")
- val result1 = sql("SELECT * FROM tmpemptytable").collect()
- assert(result1.size === 0)
- Utils.deleteRecursively(tmpdir)
- }
-
test("read/write fixed-length decimals") {
for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
index daa7ca65cd..4c081fb451 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
@@ -34,19 +34,6 @@ class ParquetQuerySuite2 extends QueryTest with ParquetTest {
}
}
- test("insertion") {
- withTempDir { dir =>
- val data = (0 until 10).map(i => (i, i.toString))
- withParquetTable(data, "t") {
- createParquetFile[(Int, String)](dir.toString).registerTempTable("dest")
- withTempTable("dest") {
- sql("INSERT OVERWRITE INTO dest SELECT * FROM t")
- checkAnswer(table("dest"), data)
- }
- }
- }
- }
-
test("appending") {
val data = (0 until 10).map(i => (i, i.toString))
withParquetTable(data, "t") {
@@ -98,13 +85,4 @@ class ParquetQuerySuite2 extends QueryTest with ParquetTest {
checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
}
}
-
- test("SPARK-3536 regression: query empty Parquet file shouldn't throw") {
- withTempDir { dir =>
- createParquetFile[(Int, String)](dir.toString).registerTempTable("t")
- withTempTable("t") {
- checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
- }
- }
- }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index bf56e60cf9..a9a20a54be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -71,7 +71,7 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
class HiveContext(sc: SparkContext) extends SQLContext(sc) {
self =>
- private[sql] override lazy val conf: SQLConf = new SQLConf {
+ protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
}
@@ -348,7 +348,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val hivePlanner = new SparkPlanner with HiveStrategies {
val hiveContext = self
- override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
+ override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
DataSourceStrategy,
HiveCommandStrategy(self),
HiveDDLStrategy,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 52e1f0d94f..47431cef03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -102,7 +102,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
new this.QueryExecution { val logical = plan }
/** Fewer partitions to speed up testing. */
- private[sql] override lazy val conf: SQLConf = new SQLConf {
+ protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
}