aboutsummaryrefslogtreecommitdiff
path: root/docker-integration-tests
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-11-17 11:29:02 -0800
committerMichael Armbrust <michael@databricks.com>2015-11-17 11:29:02 -0800
commitd9251496640a77568a1e9ed5045ce2dfba4b437b (patch)
treefb13126d187e531922b378d9bd528dd2652f2ab7 /docker-integration-tests
parent0158ff7737d10e68be2e289533241da96b496e89 (diff)
downloadspark-d9251496640a77568a1e9ed5045ce2dfba4b437b.tar.gz
spark-d9251496640a77568a1e9ed5045ce2dfba4b437b.tar.bz2
spark-d9251496640a77568a1e9ed5045ce2dfba4b437b.zip
[SPARK-10186][SQL] support postgre array type in JDBCRDD
Add ARRAY support to `PostgresDialect`. Nested ARRAY is not allowed for now because it's hard to get the array dimension info. See http://stackoverflow.com/questions/16619113/how-to-get-array-base-type-in-postgres-via-jdbc Thanks for the initial work from mariusvniekerk ! Close https://github.com/apache/spark/pull/9137 Author: Wenchen Fan <wenchen@databricks.com> Closes #9662 from cloud-fan/postgre.
Diffstat (limited to 'docker-integration-tests')
-rw-r--r--docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala44
1 files changed, 28 insertions, 16 deletions
diff --git a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index 164a7f3962..2e18d0a2ba 100644
--- a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.jdbc
import java.sql.Connection
import java.util.Properties
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.expressions.{Literal, If}
import org.apache.spark.tags.DockerTest
@DockerTest
@@ -37,28 +39,32 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
override def dataPreparation(conn: Connection): Unit = {
conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
conn.setCatalog("foo")
- conn.prepareStatement("CREATE TABLE bar (a text, b integer, c double precision, d bigint, "
- + "e bit(1), f bit(10), g bytea, h boolean, i inet, j cidr)").executeUpdate()
+ conn.prepareStatement("CREATE TABLE bar (c0 text, c1 integer, c2 double precision, c3 bigint, "
+ + "c4 bit(1), c5 bit(10), c6 bytea, c7 boolean, c8 inet, c9 cidr, "
+ + "c10 integer[], c11 text[])").executeUpdate()
conn.prepareStatement("INSERT INTO bar VALUES ('hello', 42, 1.25, 123456789012345, B'0', "
- + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16')").executeUpdate()
+ + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16', "
+ + """'{1, 2}', '{"a", null, "b"}')""").executeUpdate()
}
test("Type mapping for various types") {
val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
val rows = df.collect()
assert(rows.length == 1)
- val types = rows(0).toSeq.map(x => x.getClass.toString)
- assert(types.length == 10)
- assert(types(0).equals("class java.lang.String"))
- assert(types(1).equals("class java.lang.Integer"))
- assert(types(2).equals("class java.lang.Double"))
- assert(types(3).equals("class java.lang.Long"))
- assert(types(4).equals("class java.lang.Boolean"))
- assert(types(5).equals("class [B"))
- assert(types(6).equals("class [B"))
- assert(types(7).equals("class java.lang.Boolean"))
- assert(types(8).equals("class java.lang.String"))
- assert(types(9).equals("class java.lang.String"))
+ val types = rows(0).toSeq.map(x => x.getClass)
+ assert(types.length == 12)
+ assert(classOf[String].isAssignableFrom(types(0)))
+ assert(classOf[java.lang.Integer].isAssignableFrom(types(1)))
+ assert(classOf[java.lang.Double].isAssignableFrom(types(2)))
+ assert(classOf[java.lang.Long].isAssignableFrom(types(3)))
+ assert(classOf[java.lang.Boolean].isAssignableFrom(types(4)))
+ assert(classOf[Array[Byte]].isAssignableFrom(types(5)))
+ assert(classOf[Array[Byte]].isAssignableFrom(types(6)))
+ assert(classOf[java.lang.Boolean].isAssignableFrom(types(7)))
+ assert(classOf[String].isAssignableFrom(types(8)))
+ assert(classOf[String].isAssignableFrom(types(9)))
+ assert(classOf[Seq[Int]].isAssignableFrom(types(10)))
+ assert(classOf[Seq[String]].isAssignableFrom(types(11)))
assert(rows(0).getString(0).equals("hello"))
assert(rows(0).getInt(1) == 42)
assert(rows(0).getDouble(2) == 1.25)
@@ -72,11 +78,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(rows(0).getBoolean(7) == true)
assert(rows(0).getString(8) == "172.16.0.42")
assert(rows(0).getString(9) == "192.168.0.0/16")
+ assert(rows(0).getSeq(10) == Seq(1, 2))
+ assert(rows(0).getSeq(11) == Seq("a", null, "b"))
}
test("Basic write test") {
val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
- df.write.jdbc(jdbcUrl, "public.barcopy", new Properties)
// Test only that it doesn't crash.
+ df.write.jdbc(jdbcUrl, "public.barcopy", new Properties)
+ // Test write null values.
+ df.select(df.queryExecution.analyzed.output.map { a =>
+ Column(If(Literal(true), Literal(null), a)).as(a.name)
+ }: _*).write.jdbc(jdbcUrl, "public.barcopy2", new Properties)
}
}