diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-09-02 18:53:12 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-09-02 18:53:12 +0200 |
commit | 6bcbf9b74351b5ac5221e3c309cb98e6f9cc7c5a (patch) | |
tree | 364adc0465598e60b7d15e3e810fa3875bd98e6c /python | |
parent | 806d8a8e980d8ba2f4261bceb393c40bafaa2f73 (diff) | |
download | spark-6bcbf9b74351b5ac5221e3c309cb98e6f9cc7c5a.tar.gz spark-6bcbf9b74351b5ac5221e3c309cb98e6f9cc7c5a.tar.bz2 spark-6bcbf9b74351b5ac5221e3c309cb98e6f9cc7c5a.zip |
[SPARK-17351] Refactor JDBCRDD to expose ResultSet -> Seq[Row] utility methods
This patch refactors the internals of the JDBC data source in order to allow some of its code to be re-used in an automated comparison testing harness. Here are the key changes:
- Move the JDBC `ResultSetMetadata` to `StructType` conversion logic from `JDBCRDD.resolveTable()` to the `JdbcUtils` object (as a new `getSchema(ResultSet, JdbcDialect)` method), allowing it to be applied on `ResultSet`s that are created elsewhere.
- Move the `ResultSet` to `InternalRow` conversion methods from `JDBCRDD` to `JdbcUtils`:
- It makes sense to move the `JDBCValueGetter` type and `makeGetter` functions here given that their write-path counterparts (`JDBCValueSetter`) are already in `JdbcUtils`.
- Add an internal `resultSetToSparkInternalRows` method which takes a `ResultSet` and schema and returns an `Iterator[InternalRow]`. This effectively extracts the main loop of `JDBCRDD` into its own method.
- Add a public `resultSetToRows` method to `JdbcUtils`, which wraps the minimal machinery around `resultSetToSparkInternalRows` in order to allow it to be called from outside of a Spark job.
- Make `JdbcDialect.get` into a `DeveloperApi` (`JdbcDialect` itself is already a `DeveloperApi`).
Put together, these changes enable the following testing pattern:
```scala
val jdbResultSet: ResultSet = conn.prepareStatement(query).executeQuery()
val resultSchema: StructType = JdbcUtils.getSchema(jdbResultSet, JdbcDialects.get("jdbc:postgresql"))
val jdbcRows: Seq[Row] = JdbcUtils.resultSetToRows(jdbResultSet, schema).toSeq
checkAnswer(sparkResult, jdbcRows) // in a test case
```
Author: Josh Rosen <joshrosen@databricks.com>
Closes #14907 from JoshRosen/modularize-jdbc-internals.
Diffstat (limited to 'python')
0 files changed, 0 insertions, 0 deletions