aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/test
diff options
context:
space:
mode:
authorDong Wang <dong@databricks.com>2015-06-05 17:41:12 -0700
committerYin Huai <yhuai@databricks.com>2015-06-05 17:41:12 -0700
commiteb19d3f75cbd002f7e72ce02017a8de67f562792 (patch)
tree67abe7c0b12822d9bd0f199026a2d080d5387a84 /sql/hive-thriftserver/src/test
parent6ebe419f335fcfb66dd3da74baf35eb5b2fc061d (diff)
downloadspark-eb19d3f75cbd002f7e72ce02017a8de67f562792.tar.gz
spark-eb19d3f75cbd002f7e72ce02017a8de67f562792.tar.bz2
spark-eb19d3f75cbd002f7e72ce02017a8de67f562792.zip
[SPARK-6964] [SQL] Support Cancellation in the Thrift Server
Support runInBackground in SparkExecuteStatementOperation, and add cancellation Author: Dong Wang <dong@databricks.com> Closes #6207 from dongwang218/SPARK-6964-jdbc-cancel and squashes the following commits: 687c113 [Dong Wang] fix 100 characters 7bfa2a7 [Dong Wang] fix merge 380480f [Dong Wang] fix for liancheng's comments eb3e385 [Dong Wang] small nit 341885b [Dong Wang] small fix 3d8ebf8 [Dong Wang] add spark.sql.hive.thriftServer.async flag 04142c3 [Dong Wang] set SQLSession for async execution 184ec35 [Dong Wang] keep hive conf 819ae03 [Dong Wang] [SPARK-6964][SQL][WIP] Support Cancellation in the Thrift Server
Diffstat (limited to 'sql/hive-thriftserver/src/test')
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala42
1 files changed, 40 insertions, 2 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index f57c7083ea..178bd1f5cb 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.thriftserver
import java.io.File
import java.net.URL
-import java.sql.{Date, DriverManager, Statement}
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, DriverManager, SQLException, Statement}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.concurrent.{Await, Promise}
+import scala.concurrent.{Await, Promise, future}
+import scala.concurrent.ExecutionContext.Implicits.global
import scala.sys.process.{Process, ProcessLogger}
import scala.util.{Random, Try}
@@ -338,6 +340,42 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
)
}
+
+ test("test jdbc cancel") {
+ withJdbcStatement { statement =>
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_map",
+ "CREATE TABLE test_map(key INT, value STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
+
+ queries.foreach(statement.execute)
+
+ val largeJoin = "SELECT COUNT(*) FROM test_map " +
+ List.fill(10)("join test_map").mkString(" ")
+ val f = future { Thread.sleep(100); statement.cancel(); }
+ val e = intercept[SQLException] {
+ statement.executeQuery(largeJoin)
+ }
+ assert(e.getMessage contains "cancelled")
+ Await.result(f, 3.minute)
+
+ // cancel is a noop
+ statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
+ val sf = future { Thread.sleep(100); statement.cancel(); }
+ val smallJoin = "SELECT COUNT(*) FROM test_map " +
+ List.fill(4)("join test_map").mkString(" ")
+ val rs1 = statement.executeQuery(smallJoin)
+ Await.result(sf, 3.minute)
+ rs1.next()
+ assert(rs1.getInt(1) === math.pow(5, 5))
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map")
+ rs2.next()
+ assert(rs2.getInt(1) === 5)
+ rs2.close()
+ }
+ }
}
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {