From eb19d3f75cbd002f7e72ce02017a8de67f562792 Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Fri, 5 Jun 2015 17:41:12 -0700 Subject: [SPARK-6964] [SQL] Support Cancellation in the Thrift Server Support runInBackground in SparkExecuteStatementOperation, and add cancellation Author: Dong Wang Closes #6207 from dongwang218/SPARK-6964-jdbc-cancel and squashes the following commits: 687c113 [Dong Wang] fix 100 characters 7bfa2a7 [Dong Wang] fix merge 380480f [Dong Wang] fix for liancheng's comments eb3e385 [Dong Wang] small nit 341885b [Dong Wang] small fix 3d8ebf8 [Dong Wang] add spark.sql.hive.thriftServer.async flag 04142c3 [Dong Wang] set SQLSession for async execution 184ec35 [Dong Wang] keep hive conf 819ae03 [Dong Wang] [SPARK-6964][SQL][WIP] Support Cancellation in the Thrift Server --- .../thriftserver/HiveThriftServer2Suites.scala | 42 ++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) (limited to 'sql/hive-thriftserver/src/test') diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index f57c7083ea..178bd1f5cb 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.URL -import java.sql.{Date, DriverManager, Statement} +import java.nio.charset.StandardCharsets +import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.concurrent.{Await, Promise} +import scala.concurrent.{Await, Promise, future} +import scala.concurrent.ExecutionContext.Implicits.global import scala.sys.process.{Process, ProcessLogger} import scala.util.{Random, Try} @@ -338,6 +340,42 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } ) } + + test("test jdbc cancel") { + withJdbcStatement { statement => + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") + + queries.foreach(statement.execute) + + val largeJoin = "SELECT COUNT(*) FROM test_map " + + List.fill(10)("join test_map").mkString(" ") + val f = future { Thread.sleep(100); statement.cancel(); } + val e = intercept[SQLException] { + statement.executeQuery(largeJoin) + } + assert(e.getMessage contains "cancelled") + Await.result(f, 3.minute) + + // cancel is a noop + statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") + val sf = future { Thread.sleep(100); statement.cancel(); } + val smallJoin = "SELECT COUNT(*) FROM test_map " + + List.fill(4)("join test_map").mkString(" ") + val rs1 = statement.executeQuery(smallJoin) + Await.result(sf, 3.minute) + rs1.next() + assert(rs1.getInt(1) === math.pow(5, 5)) + rs1.close() + + val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map") + rs2.next() + assert(rs2.getInt(1) === 5) + rs2.close() + } + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { -- cgit v1.2.3