aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHung Lin <hung@zoomdata.com>2015-02-08 22:36:42 -0800
committerReynold Xin <rxin@databricks.com>2015-02-08 22:36:42 -0800
commit4575c5643a82818bf64f9648314bdc2fdc12febb (patch)
tree695c06b02ca3b575c22bf7c1e107dabb35e5681c
parent4396dfb37f433ef186e3e0a09db9906986ec940b (diff)
downloadspark-4575c5643a82818bf64f9648314bdc2fdc12febb.tar.gz
spark-4575c5643a82818bf64f9648314bdc2fdc12febb.tar.bz2
spark-4575c5643a82818bf64f9648314bdc2fdc12febb.zip
[SPARK-5472][SQL] Fix Scala code style
Fix Scala code style. Author: Hung Lin <hung@zoomdata.com> Closes #4464 from hunglin/SPARK-5472 and squashes the following commits: ef7a3b3 [Hung Lin] SPARK-5472: fix scala style
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala35
2 files changed, 41 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index a2f94675fb..0bec32cca1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -17,13 +17,10 @@
package org.apache.spark.sql.jdbc
-import java.sql.{Connection, DatabaseMetaData, DriverManager, ResultSet, ResultSetMetaData, SQLException}
-import scala.collection.mutable.ArrayBuffer
+import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException}
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.NextIterator
-import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._
@@ -100,7 +97,7 @@ private[sql] object JDBCRDD extends Logging {
try {
val rsmd = rs.getMetaData
val ncols = rsmd.getColumnCount
- var fields = new Array[StructField](ncols);
+ val fields = new Array[StructField](ncols)
var i = 0
while (i < ncols) {
val columnName = rsmd.getColumnName(i + 1)
@@ -176,23 +173,27 @@ private[sql] object JDBCRDD extends Logging {
*
* @return An RDD representing "SELECT requiredColumns FROM fqTable".
*/
- def scanTable(sc: SparkContext,
- schema: StructType,
- driver: String,
- url: String,
- fqTable: String,
- requiredColumns: Array[String],
- filters: Array[Filter],
- parts: Array[Partition]): RDD[Row] = {
+ def scanTable(
+ sc: SparkContext,
+ schema: StructType,
+ driver: String,
+ url: String,
+ fqTable: String,
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ parts: Array[Partition]): RDD[Row] = {
+
val prunedSchema = pruneSchema(schema, requiredColumns)
- return new JDBCRDD(sc,
- getConnector(driver, url),
- prunedSchema,
- fqTable,
- requiredColumns,
- filters,
- parts)
+ return new
+ JDBCRDD(
+ sc,
+ getConnector(driver, url),
+ prunedSchema,
+ fqTable,
+ requiredColumns,
+ filters,
+ parts)
}
}
@@ -412,6 +413,5 @@ private[sql] class JDBCRDD(
gotNext = false
nextValue
}
-
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index e09125e406..66ad38eb7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -96,7 +96,8 @@ private[sql] class DefaultSource extends RelationProvider {
if (driver != null) Class.forName(driver)
- if ( partitionColumn != null
+ if (
+ partitionColumn != null
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
sys.error("Partitioning incompletely specified")
}
@@ -104,30 +105,34 @@ private[sql] class DefaultSource extends RelationProvider {
val partitionInfo = if (partitionColumn == null) {
null
} else {
- JDBCPartitioningInfo(partitionColumn,
- lowerBound.toLong, upperBound.toLong,
- numPartitions.toInt)
+ JDBCPartitioningInfo(
+ partitionColumn,
+ lowerBound.toLong,
+ upperBound.toLong,
+ numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
JDBCRelation(url, table, parts)(sqlContext)
}
}
-private[sql] case class JDBCRelation(url: String,
- table: String,
- parts: Array[Partition])(
- @transient val sqlContext: SQLContext)
- extends PrunedFilteredScan {
+private[sql] case class JDBCRelation(
+ url: String,
+ table: String,
+ parts: Array[Partition])(@transient val sqlContext: SQLContext) extends PrunedFilteredScan {
override val schema = JDBCRDD.resolveTable(url, table)
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = {
val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName
- JDBCRDD.scanTable(sqlContext.sparkContext,
- schema,
- driver, url,
- table,
- requiredColumns, filters,
- parts)
+ JDBCRDD.scanTable(
+ sqlContext.sparkContext,
+ schema,
+ driver,
+ url,
+ table,
+ requiredColumns,
+ filters,
+ parts)
}
}