aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-03-08 10:48:53 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-08 10:48:53 -0800
commitf3387d97487cbef894b6963bc008f6a5c4294a85 (patch)
tree91b7de02b0cfcd37c07907feef58320c912a09e1 /sql
parente420fd4592615d91cdcbca674ac58bcca6ab2ff3 (diff)
downloadspark-f3387d97487cbef894b6963bc008f6a5c4294a85.tar.gz
spark-f3387d97487cbef894b6963bc008f6a5c4294a85.tar.bz2
spark-f3387d97487cbef894b6963bc008f6a5c4294a85.zip
[SPARK-19864][SQL][TEST] provide a makeQualifiedPath functions to optimize some code
## What changes were proposed in this pull request? Currently there are lots of places to make the path qualified, it is better to provide a function to do this, then the code will be more simple. ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #17204 from windpiger/addQualifiledPathUtil.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala11
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala9
7 files changed, 32 insertions, 49 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b2199fdf90..c1f8b2b3d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -132,13 +132,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def makeQualifiedPath(path: String): URI = {
- // copy-paste from SessionCatalog
- val hadoopPath = new Path(path)
- val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration)
- fs.makeQualified(hadoopPath).toUri
- }
-
test("Create Database using Default Warehouse Path") {
val catalog = spark.sessionState.catalog
val dbName = "db1"
@@ -2086,9 +2079,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Seq(1).toDF("a").write.saveAsTable("t")
val tblloc = new File(loc, "t")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- val tblPath = new Path(tblloc.getAbsolutePath)
- val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
- assert(table.location == fs.makeQualified(tblPath).toUri)
+ assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
assert(tblloc.listFiles().nonEmpty)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index d4afb9d8af..9201954b66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -18,13 +18,14 @@
package org.apache.spark.sql.test
import java.io.File
+import java.net.URI
import java.util.UUID
import scala.language.implicitConversions
import scala.util.Try
import scala.util.control.NonFatal
-import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
@@ -294,6 +295,17 @@ private[sql] trait SQLTestUtils
test(name) { runOnThread() }
}
}
+
+ /**
+ * This method is used to make the given path qualified, when a path
+ * does not contain a scheme, this path will not be changed after the default
+ * FileSystem is changed.
+ */
+ def makeQualifiedPath(path: String): URI = {
+ val hadoopPath = new Path(path)
+ val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.makeQualified(hadoopPath).toUri
+ }
}
private[sql] object SQLTestUtils {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index df2c1cee94..10d929a4a0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1654,10 +1654,8 @@ class HiveDDLSuite
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
- val dirPath = new Path(dir.getAbsolutePath)
- val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(new Path(table.location) == fs.makeQualified(dirPath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
@@ -1675,10 +1673,8 @@ class HiveDDLSuite
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
- val dirPath = new Path(dir.getAbsolutePath)
- val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(new Path(table.location) == fs.makeQualified(dirPath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())
@@ -1792,9 +1788,7 @@ class HiveDDLSuite
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- val path = new Path(loc.getAbsolutePath)
- val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
- assert(table.location == fs.makeQualified(path).toUri)
+ assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))
assert(loc.listFiles().isEmpty)
@@ -1822,9 +1816,7 @@ class HiveDDLSuite
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- val path = new Path(loc.getAbsolutePath)
- val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
- assert(table.location == fs.makeQualified(path).toUri)
+ assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))
assert(loc.listFiles().isEmpty)
@@ -1871,7 +1863,7 @@ class HiveDDLSuite
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val tblPath = new Path(tblloc.getAbsolutePath)
val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
- assert(table.location == fs.makeQualified(tblPath).toUri)
+ assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
assert(tblloc.listFiles().nonEmpty)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 4f771caa1d..ba0a7605da 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql.hive.orc
import java.io.File
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.HadoopFsRelationTest
import org.apache.spark.sql.types._
@@ -42,12 +42,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
- val basePath = new Path(file.getCanonicalPath)
- val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
- val qualifiedBasePath = fs.makeQualified(basePath)
-
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
- val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ val partitionDir = new Path(
+ CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index d79edee5b1..49be30435a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -21,8 +21,8 @@ import java.math.BigDecimal
import org.apache.hadoop.fs.Path
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.types._
class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -38,12 +38,9 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
- val basePath = new Path(file.getCanonicalPath)
- val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
- val qualifiedBasePath = fs.makeQualified(basePath)
-
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
- val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ val partitionDir = new Path(
+ CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield s"""{"a":$i,"b":"val_$i"}""")
.saveAsTextFile(partitionDir.toString)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 03207ab869..dce5bb7ddb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -23,8 +23,8 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -44,12 +44,9 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
- val basePath = new Path(file.getCanonicalPath)
- val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
- val qualifiedBasePath = fs.makeQualified(basePath)
-
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
- val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ val partitionDir = new Path(
+ CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index a47a2246dd..2ec593b95c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.sources
import org.apache.hadoop.fs.Path
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.types._
@@ -45,12 +45,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
- val basePath = new Path(file.getCanonicalPath)
- val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
- val qualifiedBasePath = fs.makeQualified(basePath)
-
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
- val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ val partitionDir = new Path(
+ CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
.saveAsTextFile(partitionDir.toString)