diff options
author | Hung Lin <hung@zoomdata.com> | 2015-02-08 22:36:42 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-02-08 22:36:51 -0800 |
commit | 955f2863e39a96c0b00ad7d3eac972bb1cfcb594 (patch) | |
tree | 7170ca5ac1eff61a9be2e02c2c10117ab1f0b43d | |
parent | fa8ea48f2d693b1e9db7a7138c23075748b3c0f5 (diff) | |
download | spark-955f2863e39a96c0b00ad7d3eac972bb1cfcb594.tar.gz spark-955f2863e39a96c0b00ad7d3eac972bb1cfcb594.tar.bz2 spark-955f2863e39a96c0b00ad7d3eac972bb1cfcb594.zip |
[SPARK-5472][SQL] Fix Scala code style
Fix Scala code style.
Author: Hung Lin <hung@zoomdata.com>
Closes #4464 from hunglin/SPARK-5472 and squashes the following commits:
ef7a3b3 [Hung Lin] SPARK-5472: fix scala style
(cherry picked from commit 4575c5643a82818bf64f9648314bdc2fdc12febb)
Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 42 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala | 35 |
2 files changed, 41 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index a2f94675fb..0bec32cca1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -17,13 +17,10 @@ package org.apache.spark.sql.jdbc -import java.sql.{Connection, DatabaseMetaData, DriverManager, ResultSet, ResultSetMetaData, SQLException} -import scala.collection.mutable.ArrayBuffer +import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException} import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.util.NextIterator -import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow} import org.apache.spark.sql.types._ import org.apache.spark.sql.sources._ @@ -100,7 +97,7 @@ private[sql] object JDBCRDD extends Logging { try { val rsmd = rs.getMetaData val ncols = rsmd.getColumnCount - var fields = new Array[StructField](ncols); + val fields = new Array[StructField](ncols) var i = 0 while (i < ncols) { val columnName = rsmd.getColumnName(i + 1) @@ -176,23 +173,27 @@ private[sql] object JDBCRDD extends Logging { * * @return An RDD representing "SELECT requiredColumns FROM fqTable". */ - def scanTable(sc: SparkContext, - schema: StructType, - driver: String, - url: String, - fqTable: String, - requiredColumns: Array[String], - filters: Array[Filter], - parts: Array[Partition]): RDD[Row] = { + def scanTable( + sc: SparkContext, + schema: StructType, + driver: String, + url: String, + fqTable: String, + requiredColumns: Array[String], + filters: Array[Filter], + parts: Array[Partition]): RDD[Row] = { + val prunedSchema = pruneSchema(schema, requiredColumns) - return new JDBCRDD(sc, - getConnector(driver, url), - prunedSchema, - fqTable, - requiredColumns, - filters, - parts) + return new + JDBCRDD( + sc, + getConnector(driver, url), + prunedSchema, + fqTable, + requiredColumns, + filters, + parts) } } @@ -412,6 +413,5 @@ private[sql] class JDBCRDD( gotNext = false nextValue } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index e09125e406..66ad38eb7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -96,7 +96,8 @@ private[sql] class DefaultSource extends RelationProvider { if (driver != null) Class.forName(driver) - if ( partitionColumn != null + if ( + partitionColumn != null && (lowerBound == null || upperBound == null || numPartitions == null)) { sys.error("Partitioning incompletely specified") } @@ -104,30 +105,34 @@ private[sql] class DefaultSource extends RelationProvider { val partitionInfo = if (partitionColumn == null) { null } else { - JDBCPartitioningInfo(partitionColumn, - lowerBound.toLong, upperBound.toLong, - numPartitions.toInt) + JDBCPartitioningInfo( + partitionColumn, + lowerBound.toLong, + upperBound.toLong, + numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) JDBCRelation(url, table, parts)(sqlContext) } } -private[sql] case class JDBCRelation(url: String, - table: String, - parts: Array[Partition])( - @transient val sqlContext: SQLContext) - extends PrunedFilteredScan { +private[sql] case class JDBCRelation( + url: String, + table: String, + parts: Array[Partition])(@transient val sqlContext: SQLContext) extends PrunedFilteredScan { override val schema = JDBCRDD.resolveTable(url, table) override def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = { val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName - JDBCRDD.scanTable(sqlContext.sparkContext, - schema, - driver, url, - table, - requiredColumns, filters, - parts) + JDBCRDD.scanTable( + sqlContext.sparkContext, + schema, + driver, + url, + table, + requiredColumns, + filters, + parts) } } |