aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-16 14:26:41 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-16 14:26:41 -0800
commitcb6bd83a91d9b4a227dc6467255231869c1820e2 (patch)
tree94b0475229b6db9d90c084543c62e1a83822a057 /sql/hive-thriftserver
parent7850e0c707affd5eafd570fb43716753396cf479 (diff)
downloadspark-cb6bd83a91d9b4a227dc6467255231869c1820e2.tar.gz
spark-cb6bd83a91d9b4a227dc6467255231869c1820e2.tar.bz2
spark-cb6bd83a91d9b4a227dc6467255231869c1820e2.zip
[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types
SPARK-4407 was detected while working on SPARK-4309. Merged these two into a single PR since 1.2.0 RC is approaching. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3178) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3178 from liancheng/date-for-thriftserver and squashes the following commits: 6f71d0b [Cheng Lian] Makes toHiveString static 26fa955 [Cheng Lian] Fixes complex type support in Hive 0.13.1 shim a92882a [Cheng Lian] Updates HiveShim for 0.13.1 73f442b [Cheng Lian] Adds Date support for HiveThriftServer2 (Hive 0.12.0)
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala90
-rw-r--r--sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala11
-rw-r--r--sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala29
3 files changed, 83 insertions, 47 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index bba29b2bdc..23d12cbff3 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.thriftserver
import java.io.File
import java.net.ServerSocket
-import java.sql.{DriverManager, Statement}
+import java.sql.{Date, DriverManager, Statement}
import java.util.concurrent.TimeoutException
+import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
@@ -51,6 +52,15 @@ import org.apache.spark.sql.hive.HiveShim
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)
+ object TestData {
+ def getTestDataFilePath(name: String) = {
+ Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name")
+ }
+
+ val smallKv = getTestDataFilePath("small_kv.txt")
+ val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
+ }
+
def randomListeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
@@ -145,12 +155,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}
}
- val env = Seq(
- // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
- "SPARK_TESTING" -> "0",
- // Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read
- // proper version information from the jar manifest.
- "SPARK_PREPEND_CLASSES" -> "")
+ // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
+ val env = Seq("SPARK_TESTING" -> "0")
Process(command, None, env: _*).run(ProcessLogger(
captureThriftServerOutput("stdout"),
@@ -194,15 +200,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
test("Test JDBC query execution") {
withJdbcStatement() { statement =>
- val dataFilePath =
- Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
-
- val queries =
- s"""SET spark.sql.shuffle.partitions=3;
- |CREATE TABLE test(key INT, val STRING);
- |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
- |CACHE TABLE test;
- """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
+ val queries = Seq(
+ "SET spark.sql.shuffle.partitions=3",
+ "DROP TABLE IF EXISTS test",
+ "CREATE TABLE test(key INT, val STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
+ "CACHE TABLE test")
queries.foreach(statement.execute)
@@ -216,14 +219,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
test("SPARK-3004 regression: result set containing NULL") {
withJdbcStatement() { statement =>
- val dataFilePath =
- Thread.currentThread().getContextClassLoader.getResource(
- "data/files/small_kv_with_null.txt")
-
val queries = Seq(
"DROP TABLE IF EXISTS test_null",
"CREATE TABLE test_null(key INT, val STRING)",
- s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_null")
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")
queries.foreach(statement.execute)
@@ -270,13 +269,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
test("SPARK-4292 regression: result set iterator issue") {
withJdbcStatement() { statement =>
- val dataFilePath =
- Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
-
val queries = Seq(
"DROP TABLE IF EXISTS test_4292",
"CREATE TABLE test_4292(key INT, val STRING)",
- s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")
queries.foreach(statement.execute)
@@ -284,10 +280,52 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
Seq(238, 86, 311, 27, 165).foreach { key =>
resultSet.next()
- assert(resultSet.getInt(1) == key)
+ assert(resultSet.getInt(1) === key)
}
statement.executeQuery("DROP TABLE IF EXISTS test_4292")
}
}
+
+ test("SPARK-4309 regression: Date type support") {
+ withJdbcStatement() { statement =>
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_date",
+ "CREATE TABLE test_date(key INT, value STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")
+
+ queries.foreach(statement.execute)
+
+ assertResult(Date.valueOf("2011-01-01")) {
+ val resultSet = statement.executeQuery(
+ "SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1")
+ resultSet.next()
+ resultSet.getDate(1)
+ }
+ }
+ }
+
+ test("SPARK-4407 regression: Complex type support") {
+ withJdbcStatement() { statement =>
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_map",
+ "CREATE TABLE test_map(key INT, value STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
+
+ queries.foreach(statement.execute)
+
+ assertResult("""{238:"val_238"}""") {
+ val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1")
+ resultSet.next()
+ resultSet.getString(1)
+ }
+
+ assertResult("""["238","val_238"]""") {
+ val resultSet = statement.executeQuery(
+ "SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1")
+ resultSet.next()
+ resultSet.getString(1)
+ }
+ }
+ }
}
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index aa2e3cab72..9258ad0cdf 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.thriftserver
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
import java.util.{ArrayList => JArrayList, Map => JMap}
import scala.collection.JavaConversions._
@@ -131,14 +131,13 @@ private[hive] class SparkExecuteStatementOperation(
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
+ case DateType =>
+ to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date]))
case TimestampType =>
to.addColumnValue(
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
- val hiveString = result
- .queryExecution
- .asInstanceOf[HiveContext#QueryExecution]
- .toHiveString((from.get(ordinal), dataTypes(ordinal)))
+ val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to.addColumnValue(ColumnValue.stringValue(hiveString))
}
}
@@ -163,6 +162,8 @@ private[hive] class SparkExecuteStatementOperation(
to.addColumnValue(ColumnValue.byteValue(null))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(null))
+ case DateType =>
+ to.addColumnValue(ColumnValue.dateValue(null))
case TimestampType =>
to.addColumnValue(ColumnValue.timestampValue(null))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index a642478d08..3c7f62af45 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
import java.util.concurrent.Future
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
@@ -113,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation(
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
- to += from.get(ordinal).asInstanceOf[String]
+ to += from.getString(ordinal)
case IntegerType =>
to += from.getInt(ordinal)
case BooleanType =>
@@ -123,23 +123,20 @@ private[hive] class SparkExecuteStatementOperation(
case FloatType =>
to += from.getFloat(ordinal)
case DecimalType() =>
- to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
+ to += from.getAs[BigDecimal](ordinal).bigDecimal
case LongType =>
to += from.getLong(ordinal)
case ByteType =>
to += from.getByte(ordinal)
case ShortType =>
to += from.getShort(ordinal)
+ case DateType =>
+ to += from.getAs[Date](ordinal)
case TimestampType =>
- to += from.get(ordinal).asInstanceOf[Timestamp]
- case BinaryType =>
- to += from.get(ordinal).asInstanceOf[String]
- case _: ArrayType =>
- to += from.get(ordinal).asInstanceOf[String]
- case _: StructType =>
- to += from.get(ordinal).asInstanceOf[String]
- case _: MapType =>
- to += from.get(ordinal).asInstanceOf[String]
+ to += from.getAs[Timestamp](ordinal)
+ case BinaryType | _: ArrayType | _: StructType | _: MapType =>
+ val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
+ to += hiveString
}
}
@@ -147,9 +144,9 @@ private[hive] class SparkExecuteStatementOperation(
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
- val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
+ val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
if (!iter.hasNext) {
- reultRowSet
+ resultRowSet
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
@@ -166,10 +163,10 @@ private[hive] class SparkExecuteStatementOperation(
}
curCol += 1
}
- reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
+ resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
curRow += 1
}
- reultRowSet
+ resultRowSet
}
}