diff options
Diffstat (limited to 'sql')
46 files changed, 282 insertions, 265 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index ec895af9c3..cfd9cb0e62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types.StructType @@ -280,9 +282,8 @@ trait Row extends Serializable { * * @throws ClassCastException when data type does not match. */ - def getList[T](i: Int): java.util.List[T] = { - scala.collection.JavaConversions.seqAsJavaList(getSeq[T](i)) - } + def getList[T](i: Int): java.util.List[T] = + getSeq[T](i).asJava /** * Returns the value at position i of map type as a Scala Map. @@ -296,9 +297,8 @@ trait Row extends Serializable { * * @throws ClassCastException when data type does not match. */ - def getJavaMap[K, V](i: Int): java.util.Map[K, V] = { - scala.collection.JavaConversions.mapAsJavaMap(getMap[K, V](i)) - } + def getJavaMap[K, V](i: Int): java.util.Map[K, V] = + getMap[K, V](i).asJava /** * Returns the value at position i of struct type as an [[Row]] object. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 503c4f4b20..4cc9a5520a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -147,7 +147,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog { override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { val result = ArrayBuffer.empty[(String, Boolean)] - for (name <- tables.keySet()) { + for (name <- tables.keySet().asScala) { result += ((name, true)) } result diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index a4fd4cf3b3..77a42c0873 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.{lang => jl} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.expressions._ @@ -209,7 +209,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * * @since 1.3.1 */ - def fill(valueMap: java.util.Map[String, Any]): DataFrame = fill0(valueMap.toSeq) + def fill(valueMap: java.util.Map[String, Any]): DataFrame = fill0(valueMap.asScala.toSeq) /** * (Scala-specific) Returns a new [[DataFrame]] that replaces null values. @@ -254,7 +254,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * @since 1.3.1 */ def replace[T](col: String, replacement: java.util.Map[T, T]): DataFrame = { - replace[T](col, replacement.toMap : Map[T, T]) + replace[T](col, replacement.asScala.toMap) } /** @@ -277,7 +277,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * @since 1.3.1 */ def replace[T](cols: Array[String], replacement: java.util.Map[T, T]): DataFrame = { - replace(cols.toSeq, replacement.toMap) + replace(cols.toSeq, replacement.asScala.toMap) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 6dc7bfe333..97a8b6518a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import java.util.Properties +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Experimental @@ -90,7 +92,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 1.4.0 */ def options(options: java.util.Map[String, String]): DataFrameReader = { - this.options(scala.collection.JavaConversions.mapAsScalaMap(options)) + this.options(options.asScala) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ce8744b531..b2a66dd417 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import java.util.Properties +import scala.collection.JavaConverters._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -109,7 +111,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 1.4.0 */ def options(options: java.util.Map[String, String]): DataFrameWriter = { - this.options(scala.collection.JavaConversions.mapAsScalaMap(options)) + this.options(options.asScala) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index 99d557b03a..ee31d83cce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.spark.annotation.Experimental @@ -188,7 +188,7 @@ class GroupedData protected[sql]( * @since 1.3.0 */ def agg(exprs: java.util.Map[String, String]): DataFrame = { - agg(exprs.toMap) + agg(exprs.asScala.toMap) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index e9de14f025..e6f7619519 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import java.util.Properties import scala.collection.immutable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.parquet.hadoop.ParquetOutputCommitter @@ -531,7 +531,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { /** Set Spark SQL configuration properties. */ def setConf(props: Properties): Unit = settings.synchronized { - props.foreach { case (k, v) => setConfString(k, v) } + props.asScala.foreach { case (k, v) => setConfString(k, v) } } /** Set the given Spark SQL configuration property using a `string` value. */ @@ -601,24 +601,25 @@ private[sql] class SQLConf extends Serializable with CatalystConf { * Return all the configuration properties that have been set (i.e. not the default). * This creates a new copy of the config properties in the form of a Map. */ - def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap } + def getAllConfs: immutable.Map[String, String] = + settings.synchronized { settings.asScala.toMap } /** * Return all the configuration definitions that have been defined in [[SQLConf]]. Each * definition contains key, defaultValue and doc. */ def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized { - sqlConfEntries.values.filter(_.isPublic).map { entry => + sqlConfEntries.values.asScala.filter(_.isPublic).map { entry => (entry.key, entry.defaultValueString, entry.doc) }.toSeq } private[spark] def unsetConf(key: String): Unit = { - settings -= key + settings.remove(key) } private[spark] def unsetConf(entry: SQLConfEntry[_]): Unit = { - settings -= entry.key + settings.remove(entry.key) } private[spark] def clear(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a1eea09e04..4e8414af50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -21,7 +21,7 @@ import java.beans.Introspector import java.util.Properties import java.util.concurrent.atomic.AtomicReference -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -225,7 +225,7 @@ class SQLContext(@transient val sparkContext: SparkContext) conf.setConf(properties) // After we have populated SQLConf, we call setConf to populate other confs in the subclass // (e.g. hiveconf in HiveContext). - properties.foreach { + properties.asScala.foreach { case (key, value) => setConf(key, value) } } @@ -567,7 +567,7 @@ class SQLContext(@transient val sparkContext: SparkContext) tableName: String, source: String, options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, options.toMap) + createExternalTable(tableName, source, options.asScala.toMap) } /** @@ -612,7 +612,7 @@ class SQLContext(@transient val sparkContext: SparkContext) source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, schema, options.toMap) + createExternalTable(tableName, source, schema, options.asScala.toMap) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 8fbaf3a305..0117244366 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.util.ServiceLoader -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Success, Failure, Try} @@ -55,7 +55,7 @@ object ResolvedDataSource extends Logging { val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) - serviceLoader.iterator().filter(_.shortName().equalsIgnoreCase(provider)).toList match { + serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match { /** the provider format did not match any given registered aliases */ case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { case Success(dataSource) => dataSource diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 3f8353af6e..0a6bb44445 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.{Map => JMap} -import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap} +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport.ReadContext @@ -44,7 +44,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with val parquetRequestedSchema = readContext.getRequestedSchema val catalystRequestedSchema = - Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata => + Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata => metadata // First tries to read requested schema, which may result from projections .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) @@ -123,7 +123,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.fold(context.getFileSchema) { schemaString => val toParquet = new CatalystSchemaConverter(conf) val fileSchema = context.getFileSchema.asGroupType() - val fileFieldNames = fileSchema.getFields.map(_.getName).toSet + val fileFieldNames = fileSchema.getFields.asScala.map(_.getName).toSet StructType // Deserializes the Catalyst schema of requested columns @@ -152,7 +152,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) - new ReadContext(parquetRequestedSchema, metadata) + new ReadContext(parquetRequestedSchema, metadata.asJava) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index cbf0704c4a..f682ca0d8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary @@ -183,7 +183,7 @@ private[parquet] class CatalystRowConverter( // those missing fields and create converters for them, although values of these fields are // always null. val paddedParquetFields = { - val parquetFields = parquetType.getFields + val parquetFields = parquetType.getFields.asScala val parquetFieldNames = parquetFields.map(_.getName).toSet val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 535f0684e9..be6c0545f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema.OriginalType._ @@ -82,7 +82,7 @@ private[parquet] class CatalystSchemaConverter( def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType()) private def convert(parquetSchema: GroupType): StructType = { - val fields = parquetSchema.getFields.map { field => + val fields = parquetSchema.getFields.asScala.map { field => field.getRepetition match { case OPTIONAL => StructField(field.getName, convertField(field), nullable = true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index bbf682aec0..64982f37cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -21,7 +21,7 @@ import java.net.URI import java.util.logging.{Logger => JLogger} import java.util.{List => JList} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Try} @@ -336,7 +336,7 @@ private[sql] class ParquetRelation( override def getPartitions: Array[SparkPartition] = { val inputFormat = new ParquetInputFormat[InternalRow] { override def listStatus(jobContext: JobContext): JList[FileStatus] = { - if (cacheMetadata) cachedStatuses else super.listStatus(jobContext) + if (cacheMetadata) cachedStatuses.asJava else super.listStatus(jobContext) } } @@ -344,7 +344,8 @@ private[sql] class ParquetRelation( val rawSplits = inputFormat.getSplits(jobContext) Array.tabulate[SparkPartition](rawSplits.size) { i => - new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + new SqlNewHadoopPartition( + id, i, rawSplits.get(i).asInstanceOf[InputSplit with Writable]) } } }.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row] @@ -588,7 +589,7 @@ private[sql] object ParquetRelation extends Logging { val metadata = footer.getParquetMetadata.getFileMetaData val serializedSchema = metadata .getKeyValueMetaData - .toMap + .asScala.toMap .get(CatalystReadSupport.SPARK_METADATA_KEY) if (serializedSchema.isEmpty) { // Falls back to Parquet schema if no Spark SQL schema found. @@ -745,7 +746,7 @@ private[sql] object ParquetRelation extends Logging { // Reads footers in multi-threaded manner within each task val footers = ParquetFileReader.readAllFootersInParallel( - serializedConf.value, fakeFileStatuses, skipRowGroups) + serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = @@ -772,7 +773,7 @@ private[sql] object ParquetRelation extends Logging { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData - .toMap + .asScala.toMap .get(CatalystReadSupport.SPARK_METADATA_KEY) .flatMap(deserializeSchemaString) .getOrElse(converter.convert(fileMetaData.getSchema)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala index 42376ef7a9..142301fe87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.IOException +import java.util.{Collections, Arrays} -import scala.collection.JavaConversions._ import scala.util.Try import org.apache.hadoop.conf.Configuration @@ -107,7 +107,7 @@ private[parquet] object ParquetTypesConverter extends Logging { ParquetFileWriter.writeMetadataFile( conf, path, - new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil) + Arrays.asList(new Footer(path, new ParquetMetadata(metaData, Collections.emptyList())))) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala index ed282f98b7..d800c7456b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.joins
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
@@ -92,9 +92,9 @@ case class ShuffledHashOuterJoin( case FullOuter =>
// TODO(davies): use UnsafeRow
val leftHashTable =
- buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, left.output))
+ buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, left.output)).asScala
val rightHashTable =
- buildHashTable(rightIter, numRightRows, newProjection(rightKeys, right.output))
+ buildHashTable(rightIter, numRightRows, newProjection(rightKeys, right.output)).asScala
(leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
fullOuterIterator(key,
leftHashTable.getOrElse(key, EMPTY_LIST),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala index 59f8b079ab..5a58d846ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import java.io.OutputStream import java.util.{List => JList, Map => JMap} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import net.razorvine.pickle._ @@ -196,14 +196,15 @@ object EvaluatePython { case (c, BinaryType) if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c case (c: java.util.List[_], ArrayType(elementType, _)) => - new GenericArrayData(c.map { e => fromJava(e, elementType)}.toArray) + new GenericArrayData(c.asScala.map { e => fromJava(e, elementType)}.toArray) case (c, ArrayType(elementType, _)) if c.getClass.isArray => new GenericArrayData(c.asInstanceOf[Array[_]].map(e => fromJava(e, elementType))) case (c: java.util.Map[_, _], MapType(keyType, valueType, _)) => - val keys = c.keysIterator.map(fromJava(_, keyType)).toArray - val values = c.valuesIterator.map(fromJava(_, valueType)).toArray + val keyValues = c.asScala.toSeq + val keys = keyValues.map(kv => fromJava(kv._1, keyType)).toArray + val values = keyValues.map(kv => fromJava(kv._2, valueType)).toArray ArrayBasedMapData(keys, values) case (c, StructType(fields)) if c.getClass.isArray => @@ -367,7 +368,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: val pickle = new Unpickler iter.flatMap { pickedResult => val unpickledBatch = pickle.loads(pickedResult) - unpickledBatch.asInstanceOf[java.util.ArrayList[Any]] + unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala } }.mapPartitions { iter => val row = new GenericMutableRow(1) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 7abdd3db80..4867cebf53 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -23,7 +23,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import scala.collection.JavaConversions; +import scala.collection.JavaConverters; import scala.collection.Seq; import com.google.common.collect.ImmutableMap; @@ -96,7 +96,7 @@ public class JavaDataFrameSuite { df.groupBy().agg(countDistinct("key", "value")); df.groupBy().agg(countDistinct(col("key"), col("value"))); df.select(coalesce(col("key"))); - + // Varargs with mathfunctions DataFrame df2 = context.table("testData2"); df2.select(exp("a"), exp("b")); @@ -172,7 +172,7 @@ public class JavaDataFrameSuite { Seq<Integer> outputBuffer = (Seq<Integer>) first.getJavaMap(2).get("hello"); Assert.assertArrayEquals( bean.getC().get("hello"), - Ints.toArray(JavaConversions.seqAsJavaList(outputBuffer))); + Ints.toArray(JavaConverters.seqAsJavaListConverter(outputBuffer).asJava())); Seq<String> d = first.getAs(3); Assert.assertEquals(bean.getD().size(), d.length()); for (int i = 0; i < d.length(); i++) { @@ -206,7 +206,7 @@ public class JavaDataFrameSuite { count++; } } - + @Test public void testFrequentItems() { DataFrame df = context.table("testData2"); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala index cdaa14ac80..329ffb6608 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.sql.test.SharedSQLContext @@ -153,11 +153,11 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext { // Test Java version checkAnswer( - df.na.fill(mapAsJavaMap(Map( + df.na.fill(Map( "a" -> "test", "c" -> 1, "d" -> 2.2 - ))), + ).asJava), Row("test", null, 1, 2.2)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 4adcefb7dc..3649c2a97b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.util.{Locale, TimeZone} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ @@ -145,7 +145,7 @@ object QueryTest { } def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String = { - checkAnswer(df, expectedAnswer.toSeq) match { + checkAnswer(df, expectedAnswer.asScala) match { case Some(errorMessage) => errorMessage case None => null } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index 45db619567..bd7cf8c10a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.ByteBuffer import java.util.{List => JList, Map => JMap} -import scala.collection.JavaConverters.seqAsJavaListConverter -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters._ import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index d85c564e3e..df68432fae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.parquet.hadoop.ParquetFileReader @@ -40,8 +40,9 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq override def accept(path: Path): Boolean = pathFilter(path) }).toSeq - val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true) - footers.head.getParquetMetadata.getFileMetaData.getSchema + val footers = + ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles.asJava, true) + footers.iterator().next().getParquetMetadata.getFileMetaData.getSchema } protected def logParquetSchema(path: String): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index e6b0a2ea95..08d2b9dee9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution.datasources.parquet -import scala.collection.JavaConversions._ +import java.util.Collections + +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -28,7 +30,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.metadata.{BlockMetaData, CompressionCodecName, FileMetaData, ParquetMetadata} import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -205,9 +207,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("compression codec") { def compressionCodecFor(path: String): String = { val codecs = ParquetTypesConverter - .readMetaData(new Path(path), Some(configuration)) - .getBlocks - .flatMap(_.getColumns) + .readMetaData(new Path(path), Some(configuration)).getBlocks.asScala + .flatMap(_.getColumns.asScala) .map(_.getCodec.name()) .distinct @@ -348,14 +349,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) withTempPath { location => - val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) + val extraMetadata = Collections.singletonMap( + CatalystReadSupport.SPARK_METADATA_KEY, sparkSchema.toString) val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) ParquetFileWriter.writeMetadataFile( sqlContext.sparkContext.hadoopConfiguration, path, - new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil) + Collections.singletonList( + new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())))) assertResult(sqlContext.read.parquet(path.toString).schema) { StructType( @@ -386,7 +389,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } @@ -410,7 +413,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } @@ -434,7 +437,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } } @@ -481,7 +484,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 02cc7e5efa..306f98bcb5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -20,9 +20,9 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} import java.util.concurrent.RejectedExecutionException -import java.util.{Map => JMap, UUID} +import java.util.{Arrays, Map => JMap, UUID} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.util.control.NonFatal @@ -126,13 +126,13 @@ private[hive] class SparkExecuteStatementOperation( def getResultSetSchema: TableSchema = { if (result == null || result.queryExecution.analyzed.output.size == 0) { - new TableSchema(new FieldSchema("Result", "string", "") :: Nil) + new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) } else { logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") val schema = result.queryExecution.analyzed.output.map { attr => new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") } - new TableSchema(schema) + new TableSchema(schema.asJava) } } @@ -298,7 +298,7 @@ private[hive] class SparkExecuteStatementOperation( sqlOperationConf = new HiveConf(sqlOperationConf) // apply overlay query specific settings, if any - getConfOverlay().foreach { case (k, v) => + getConfOverlay().asScala.foreach { case (k, v) => try { sqlOperationConf.verifyAndSet(k, v) } catch { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7799704c81..a29df56798 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.hive.thriftserver -import scala.collection.JavaConversions._ - import java.io._ import java.util.{ArrayList => JArrayList, Locale} +import scala.collection.JavaConverters._ + import jline.console.ConsoleReader import jline.console.history.FileHistory @@ -101,9 +101,9 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Set all properties specified via command line. val conf: HiveConf = sessionState.getConf - sessionState.cmdProperties.entrySet().foreach { item => - val key = item.getKey.asInstanceOf[String] - val value = item.getValue.asInstanceOf[String] + sessionState.cmdProperties.entrySet().asScala.foreach { item => + val key = item.getKey.toString + val value = item.getValue.toString // We do not propagate metastore options to the execution copy of hive. if (key != "javax.jdo.option.ConnectionURL") { conf.set(key, value) @@ -316,15 +316,15 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) { // Print the column names. - Option(driver.getSchema.getFieldSchemas).map { fields => - out.println(fields.map(_.getName).mkString("\t")) + Option(driver.getSchema.getFieldSchemas).foreach { fields => + out.println(fields.asScala.map(_.getName).mkString("\t")) } } var counter = 0 try { while (!out.checkError() && driver.getResults(res)) { - res.foreach{ l => + res.asScala.foreach { l => counter += 1 out.println(l) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 644165acf7..5ad8c54f29 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -21,6 +21,8 @@ import java.io.IOException import java.util.{List => JList} import javax.security.auth.login.LoginException +import scala.collection.JavaConverters._ + import org.apache.commons.logging.Log import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.shims.Utils @@ -34,8 +36,6 @@ import org.apache.hive.service.{AbstractService, Service, ServiceException} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import scala.collection.JavaConversions._ - private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext) extends CLIService(hiveServer) with ReflectedCompositeService { @@ -76,7 +76,7 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService => def initCompositeService(hiveConf: HiveConf) { // Emulating `CompositeService.init(hiveConf)` val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") - serviceList.foreach(_.init(hiveConf)) + serviceList.asScala.foreach(_.init(hiveConf)) // Emulating `AbstractService.init(hiveConf)` invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 77272aecf2..2619286afc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.hive.thriftserver -import java.util.{ArrayList => JArrayList, List => JList} +import java.util.{Arrays, ArrayList => JArrayList, List => JList} + +import scala.collection.JavaConverters._ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} @@ -27,8 +29,6 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.Logging import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -import scala.collection.JavaConversions._ - private[hive] class SparkSQLDriver( val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver @@ -43,14 +43,14 @@ private[hive] class SparkSQLDriver( private def getResultSetSchema(query: context.QueryExecution): Schema = { val analyzed = query.analyzed logDebug(s"Result Schema: ${analyzed.output}") - if (analyzed.output.size == 0) { - new Schema(new FieldSchema("Response code", "string", "") :: Nil, null) + if (analyzed.output.isEmpty) { + new Schema(Arrays.asList(new FieldSchema("Response code", "string", "")), null) } else { val fieldSchemas = analyzed.output.map { attr => new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") } - new Schema(fieldSchemas, null) + new Schema(fieldSchemas.asJava, null) } } @@ -79,7 +79,7 @@ private[hive] class SparkSQLDriver( if (hiveResponse == null) { false } else { - res.asInstanceOf[JArrayList[String]].addAll(hiveResponse) + res.asInstanceOf[JArrayList[String]].addAll(hiveResponse.asJava) hiveResponse = null true } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 1d41c46131..bacf6cc458 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io.PrintStream -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.scheduler.StatsReportListener import org.apache.spark.sql.hive.HiveContext @@ -64,7 +64,7 @@ private[hive] object SparkSQLEnv extends Logging { hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) if (log.isDebugEnabled) { - hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) => + hiveContext.hiveconf.getAllProperties.asScala.toSeq.sorted.foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 17cc83087f..c0a458fa9a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -22,7 +22,7 @@ import java.net.{URL, URLClassLoader} import java.sql.Timestamp import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.language.implicitConversions import scala.concurrent.duration._ @@ -194,7 +194,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { logInfo("defalt warehouse location is " + defaltWarehouseLocation) // `configure` goes second to override other settings. - val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap ++ configure + val allConfig = metadataConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configure val isolatedLoader = if (hiveMetastoreJars == "builtin") { if (hiveExecutionVersion != hiveMetastoreVersion) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 64fffdbf9b..cfe2bb05ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import scala.collection.JavaConverters._ + import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructField, _} @@ -31,9 +33,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, types} import org.apache.spark.unsafe.types.UTF8String -/* Implicit conversions */ -import scala.collection.JavaConversions._ - /** * 1. The Underlying data type in catalyst and in Hive * In catalyst: @@ -290,13 +289,13 @@ private[hive] trait HiveInspectors { DateTimeUtils.fromJavaDate(poi.getWritableConstantValue.get()) case mi: StandardConstantMapObjectInspector => // take the value from the map inspector object, rather than the input data - val map = mi.getWritableConstantValue - val keys = map.keysIterator.map(unwrap(_, mi.getMapKeyObjectInspector)).toArray - val values = map.valuesIterator.map(unwrap(_, mi.getMapValueObjectInspector)).toArray + val keyValues = mi.getWritableConstantValue.asScala.toSeq + val keys = keyValues.map(kv => unwrap(kv._1, mi.getMapKeyObjectInspector)).toArray + val values = keyValues.map(kv => unwrap(kv._2, mi.getMapValueObjectInspector)).toArray ArrayBasedMapData(keys, values) case li: StandardConstantListObjectInspector => // take the value from the list inspector object, rather than the input data - val values = li.getWritableConstantValue + val values = li.getWritableConstantValue.asScala .map(unwrap(_, li.getListElementObjectInspector)) .toArray new GenericArrayData(values) @@ -342,7 +341,7 @@ private[hive] trait HiveInspectors { case li: ListObjectInspector => Option(li.getList(data)) .map { l => - val values = l.map(unwrap(_, li.getListElementObjectInspector)).toArray + val values = l.asScala.map(unwrap(_, li.getListElementObjectInspector)).toArray new GenericArrayData(values) } .orNull @@ -351,15 +350,16 @@ private[hive] trait HiveInspectors { if (map == null) { null } else { - val keys = map.keysIterator.map(unwrap(_, mi.getMapKeyObjectInspector)).toArray - val values = map.valuesIterator.map(unwrap(_, mi.getMapValueObjectInspector)).toArray + val keyValues = map.asScala.toSeq + val keys = keyValues.map(kv => unwrap(kv._1, mi.getMapKeyObjectInspector)).toArray + val values = keyValues.map(kv => unwrap(kv._2, mi.getMapValueObjectInspector)).toArray ArrayBasedMapData(keys, values) } // currently, hive doesn't provide the ConstantStructObjectInspector case si: StructObjectInspector => val allRefs = si.getAllStructFieldRefs - InternalRow.fromSeq( - allRefs.map(r => unwrap(si.getStructFieldData(data, r), r.getFieldObjectInspector))) + InternalRow.fromSeq(allRefs.asScala.map( + r => unwrap(si.getStructFieldData(data, r), r.getFieldObjectInspector))) } @@ -403,14 +403,14 @@ private[hive] trait HiveInspectors { case soi: StandardStructObjectInspector => val schema = dataType.asInstanceOf[StructType] - val wrappers = soi.getAllStructFieldRefs.zip(schema.fields).map { case (ref, field) => - wrapperFor(ref.getFieldObjectInspector, field.dataType) + val wrappers = soi.getAllStructFieldRefs.asScala.zip(schema.fields).map { + case (ref, field) => wrapperFor(ref.getFieldObjectInspector, field.dataType) } (o: Any) => { if (o != null) { val struct = soi.create() val row = o.asInstanceOf[InternalRow] - soi.getAllStructFieldRefs.zip(wrappers).zipWithIndex.foreach { + soi.getAllStructFieldRefs.asScala.zip(wrappers).zipWithIndex.foreach { case ((field, wrapper), i) => soi.setStructFieldData(struct, field, wrapper(row.get(i, schema(i).dataType))) } @@ -537,7 +537,7 @@ private[hive] trait HiveInspectors { // 1. create the pojo (most likely) object val result = x.create() var i = 0 - while (i < fieldRefs.length) { + while (i < fieldRefs.size) { // 2. set the property for the pojo val tpe = structType(i).dataType x.setStructFieldData( @@ -552,9 +552,9 @@ private[hive] trait HiveInspectors { val fieldRefs = x.getAllStructFieldRefs val structType = dataType.asInstanceOf[StructType] val row = a.asInstanceOf[InternalRow] - val result = new java.util.ArrayList[AnyRef](fieldRefs.length) + val result = new java.util.ArrayList[AnyRef](fieldRefs.size) var i = 0 - while (i < fieldRefs.length) { + while (i < fieldRefs.size) { val tpe = structType(i).dataType result.add(wrap(row.get(i, tpe), fieldRefs.get(i).getFieldObjectInspector, tpe)) i += 1 @@ -712,10 +712,10 @@ private[hive] trait HiveInspectors { def inspectorToDataType(inspector: ObjectInspector): DataType = inspector match { case s: StructObjectInspector => - StructType(s.getAllStructFieldRefs.map(f => { + StructType(s.getAllStructFieldRefs.asScala.map(f => types.StructField( f.getFieldName, inspectorToDataType(f.getFieldObjectInspector), nullable = true) - })) + )) case l: ListObjectInspector => ArrayType(inspectorToDataType(l.getListElementObjectInspector)) case m: MapObjectInspector => MapType( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 98d21aa76d..b8da0840ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import com.google.common.base.Objects @@ -483,7 +483,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // are empty. val partitions = metastoreRelation.getHiveQlPartitions().map { p => val location = p.getLocation - val values = InternalRow.fromSeq(p.getValues.zip(partitionColumnDataTypes).map { + val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) }) ParquetPartition(values, location) @@ -798,9 +798,9 @@ private[hive] case class MetastoreRelation val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tTable.setSd(sd) - sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) + sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) tTable.setPartitionKeys( - table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) + table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) table.location.foreach(sd.setLocation) table.inputFormat.foreach(sd.setInputFormat) @@ -852,11 +852,11 @@ private[hive] case class MetastoreRelation val tPartition = new org.apache.hadoop.hive.metastore.api.Partition tPartition.setDbName(databaseName) tPartition.setTableName(tableName) - tPartition.setValues(p.values) + tPartition.setValues(p.values.asJava) val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tPartition.setSd(sd) - sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) + sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) sd.setLocation(p.storage.location) sd.setInputFormat(p.storage.inputFormat) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ad33dee555..d5cd7e98b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.hive import java.sql.Date import java.util.Locale +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.serde.serdeConstants @@ -48,10 +51,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.random.RandomSampler -/* Implicit conversions */ -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer - /** * Used when we need to start parsing the AST before deciding that we are going to pass the command * back for Hive to execute natively. Will be replaced with a native command that contains the @@ -202,7 +201,7 @@ private[hive] object HiveQl extends Logging { * Returns a scala.Seq equivalent to [s] or Nil if [s] is null. */ private def nilIfEmpty[A](s: java.util.List[A]): Seq[A] = - Option(s).map(_.toSeq).getOrElse(Nil) + Option(s).map(_.asScala).getOrElse(Nil) /** * Returns this ASTNode with the text changed to `newText`. @@ -217,7 +216,7 @@ private[hive] object HiveQl extends Logging { */ def withChildren(newChildren: Seq[ASTNode]): ASTNode = { (1 to n.getChildCount).foreach(_ => n.deleteChild(0)) - n.addChildren(newChildren) + n.addChildren(newChildren.asJava) n } @@ -323,11 +322,11 @@ private[hive] object HiveQl extends Logging { assert(tree.asInstanceOf[ASTNode].getText == "TOK_CREATETABLE", "Only CREATE TABLE supported.") val tableOps = tree.getChildren val colList = - tableOps + tableOps.asScala .find(_.asInstanceOf[ASTNode].getText == "TOK_TABCOLLIST") .getOrElse(sys.error("No columnList!")).getChildren - colList.map(nodeToAttribute) + colList.asScala.map(nodeToAttribute) } /** Extractor for matching Hive's AST Tokens. */ @@ -337,7 +336,7 @@ private[hive] object HiveQl extends Logging { case t: ASTNode => CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine) Some((t.getText, - Option(t.getChildren).map(_.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]])) + Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]])) case _ => None } } @@ -424,7 +423,9 @@ private[hive] object HiveQl extends Logging { protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = { val (db, tableName) = - tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match { + tableNameParts.getChildren.asScala.map { + case Token(part, Nil) => cleanIdentifier(part) + } match { case Seq(tableOnly) => (None, tableOnly) case Seq(databaseName, table) => (Some(databaseName), table) } @@ -433,7 +434,9 @@ private[hive] object HiveQl extends Logging { } protected def extractTableIdent(tableNameParts: Node): Seq[String] = { - tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match { + tableNameParts.getChildren.asScala.map { + case Token(part, Nil) => cleanIdentifier(part) + } match { case Seq(tableOnly) => Seq(tableOnly) case Seq(databaseName, table) => Seq(databaseName, table) case other => sys.error("Hive only supports tables names like 'tableName' " + @@ -624,7 +627,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val cols = BaseSemanticAnalyzer.getColumns(list, true) if (cols != null) { tableDesc = tableDesc.copy( - schema = cols.map { field => + schema = cols.asScala.map { field => HiveColumn(field.getName, field.getType, field.getComment) }) } @@ -636,7 +639,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val cols = BaseSemanticAnalyzer.getColumns(list(0), false) if (cols != null) { tableDesc = tableDesc.copy( - partitionColumns = cols.map { field => + partitionColumns = cols.asScala.map { field => HiveColumn(field.getName, field.getType, field.getComment) }) } @@ -672,7 +675,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case _ => assert(false) } tableDesc = tableDesc.copy( - serdeProperties = tableDesc.serdeProperties ++ serdeParams) + serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala) case Token("TOK_TABLELOCATION", child :: Nil) => var location = BaseSemanticAnalyzer.unescapeSQLString(child.getText) location = EximUtil.relativeToAbsolutePath(hiveConf, location) @@ -684,7 +687,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val serdeParams = new java.util.HashMap[String, String]() BaseSemanticAnalyzer.readProps( (child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams) - tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams) + tableDesc = tableDesc.copy( + serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala) } case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) => child.getText().toLowerCase(Locale.ENGLISH) match { @@ -847,7 +851,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } val withWhere = whereClause.map { whereNode => - val Seq(whereExpr) = whereNode.getChildren.toSeq + val Seq(whereExpr) = whereNode.getChildren.asScala Filter(nodeToExpr(whereExpr), relations) }.getOrElse(relations) @@ -856,7 +860,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C // Script transformations are expressed as a select clause with a single expression of type // TOK_TRANSFORM - val transformation = select.getChildren.head match { + val transformation = select.getChildren.iterator().next() match { case Token("TOK_SELEXPR", Token("TOK_TRANSFORM", Token("TOK_EXPLIST", inputExprs) :: @@ -925,10 +929,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val withLateralView = lateralViewClause.map { lv => val Token("TOK_SELECT", - Token("TOK_SELEXPR", clauses) :: Nil) = lv.getChildren.head + Token("TOK_SELEXPR", clauses) :: Nil) = lv.getChildren.iterator().next() - val alias = - getClause("TOK_TABALIAS", clauses).getChildren.head.asInstanceOf[ASTNode].getText + val alias = getClause("TOK_TABALIAS", clauses).getChildren.iterator().next() + .asInstanceOf[ASTNode].getText val (generator, attributes) = nodesToGenerator(clauses) Generate( @@ -944,7 +948,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C // (if there is a group by) or a script transformation. val withProject: LogicalPlan = transformation.getOrElse { val selectExpressions = - select.getChildren.flatMap(selExprNodeToExpr).map(UnresolvedAlias(_)).toSeq + select.getChildren.asScala.flatMap(selExprNodeToExpr).map(UnresolvedAlias) Seq( groupByClause.map(e => e match { case Token("TOK_GROUPBY", children) => @@ -973,7 +977,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C // Handle HAVING clause. val withHaving = havingClause.map { h => - val havingExpr = h.getChildren.toSeq match { case Seq(hexpr) => nodeToExpr(hexpr) } + val havingExpr = h.getChildren.asScala match { case Seq(hexpr) => nodeToExpr(hexpr) } // Note that we added a cast to boolean. If the expression itself is already boolean, // the optimizer will get rid of the unnecessary cast. Filter(Cast(havingExpr, BooleanType), withProject) @@ -983,32 +987,42 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val withDistinct = if (selectDistinctClause.isDefined) Distinct(withHaving) else withHaving - // Handle ORDER BY, SORT BY, DISTRIBETU BY, and CLUSTER BY clause. + // Handle ORDER BY, SORT BY, DISTRIBUTE BY, and CLUSTER BY clause. val withSort = (orderByClause, sortByClause, distributeByClause, clusterByClause) match { case (Some(totalOrdering), None, None, None) => - Sort(totalOrdering.getChildren.map(nodeToSortOrder), true, withDistinct) + Sort(totalOrdering.getChildren.asScala.map(nodeToSortOrder), true, withDistinct) case (None, Some(perPartitionOrdering), None, None) => - Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withDistinct) + Sort( + perPartitionOrdering.getChildren.asScala.map(nodeToSortOrder), + false, withDistinct) case (None, None, Some(partitionExprs), None) => - RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withDistinct) + RepartitionByExpression( + partitionExprs.getChildren.asScala.map(nodeToExpr), withDistinct) case (None, Some(perPartitionOrdering), Some(partitionExprs), None) => - Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, - RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withDistinct)) + Sort( + perPartitionOrdering.getChildren.asScala.map(nodeToSortOrder), false, + RepartitionByExpression( + partitionExprs.getChildren.asScala.map(nodeToExpr), + withDistinct)) case (None, None, None, Some(clusterExprs)) => - Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false, - RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withDistinct)) + Sort( + clusterExprs.getChildren.asScala.map(nodeToExpr).map(SortOrder(_, Ascending)), + false, + RepartitionByExpression( + clusterExprs.getChildren.asScala.map(nodeToExpr), + withDistinct)) case (None, None, None, None) => withDistinct case _ => sys.error("Unsupported set of ordering / distribution clauses.") } val withLimit = - limitClause.map(l => nodeToExpr(l.getChildren.head)) + limitClause.map(l => nodeToExpr(l.getChildren.iterator().next())) .map(Limit(_, withSort)) .getOrElse(withSort) // Collect all window specifications defined in the WINDOW clause. - val windowDefinitions = windowClause.map(_.getChildren.toSeq.collect { + val windowDefinitions = windowClause.map(_.getChildren.asScala.collect { case Token("TOK_WINDOWDEF", Token(windowName, Nil) :: Token("TOK_WINDOWSPEC", spec) :: Nil) => windowName -> nodesToWindowSpecification(spec) @@ -1063,7 +1077,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val Token("TOK_SELECT", Token("TOK_SELEXPR", clauses) :: Nil) = selectClause - val alias = getClause("TOK_TABALIAS", clauses).getChildren.head.asInstanceOf[ASTNode].getText + val alias = getClause("TOK_TABALIAS", clauses).getChildren.iterator().next() + .asInstanceOf[ASTNode].getText val (generator, attributes) = nodesToGenerator(clauses) Generate( @@ -1092,7 +1107,9 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } val tableIdent = - tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match { + tableNameParts.getChildren.asScala.map { + case Token(part, Nil) => cleanIdentifier(part) + } match { case Seq(tableOnly) => Seq(tableOnly) case Seq(databaseName, table) => Seq(databaseName, table) case other => sys.error("Hive only supports tables names like 'tableName' " + @@ -1139,7 +1156,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val isPreserved = tableOrdinals.map(i => (i - 1 < 0) || joinArgs(i - 1).getText == "PRESERVE") val tables = tableOrdinals.map(i => nodeToRelation(joinArgs(i))) - val joinExpressions = tableOrdinals.map(i => joinArgs(i + 1).getChildren.map(nodeToExpr)) + val joinExpressions = + tableOrdinals.map(i => joinArgs(i + 1).getChildren.asScala.map(nodeToExpr)) val joinConditions = joinExpressions.sliding(2).map { case Seq(c1, c2) => @@ -1164,7 +1182,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C joinType = joinType.remove(joinType.length - 1)) } - val groups = (0 until joinExpressions.head.size).map(i => Coalesce(joinExpressions.map(_(i)))) + val groups = joinExpressions.head.indices.map(i => Coalesce(joinExpressions.map(_(i)))) // Unique join is not really the same as an outer join so we must group together results where // the joinExpressions are the same, taking the First of each value is only okay because the @@ -1229,7 +1247,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val tableIdent = extractTableIdent(tableNameParts) - val partitionKeys = partitionClause.map(_.getChildren.map { + val partitionKeys = partitionClause.map(_.getChildren.asScala.map { // Parse partitions. We also make keys case insensitive. case Token("TOK_PARTVAL", Token(key, Nil) :: Token(value, Nil) :: Nil) => cleanIdentifier(key.toLowerCase) -> Some(PlanUtils.stripQuotes(value)) @@ -1249,7 +1267,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val tableIdent = extractTableIdent(tableNameParts) - val partitionKeys = partitionClause.map(_.getChildren.map { + val partitionKeys = partitionClause.map(_.getChildren.asScala.map { // Parse partitions. We also make keys case insensitive. case Token("TOK_PARTVAL", Token(key, Nil) :: Token(value, Nil) :: Nil) => cleanIdentifier(key.toLowerCase) -> Some(PlanUtils.stripQuotes(value)) @@ -1590,18 +1608,18 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val (partitionByClause :: orderByClause :: sortByClause :: clusterByClause :: Nil) = getClauses( Seq("TOK_DISTRIBUTEBY", "TOK_ORDERBY", "TOK_SORTBY", "TOK_CLUSTERBY"), - partitionAndOrdering.getChildren.toSeq.asInstanceOf[Seq[ASTNode]]) + partitionAndOrdering.getChildren.asScala.asInstanceOf[Seq[ASTNode]]) (partitionByClause, orderByClause.orElse(sortByClause), clusterByClause) match { case (Some(partitionByExpr), Some(orderByExpr), None) => - (partitionByExpr.getChildren.map(nodeToExpr), - orderByExpr.getChildren.map(nodeToSortOrder)) + (partitionByExpr.getChildren.asScala.map(nodeToExpr), + orderByExpr.getChildren.asScala.map(nodeToSortOrder)) case (Some(partitionByExpr), None, None) => - (partitionByExpr.getChildren.map(nodeToExpr), Nil) + (partitionByExpr.getChildren.asScala.map(nodeToExpr), Nil) case (None, Some(orderByExpr), None) => - (Nil, orderByExpr.getChildren.map(nodeToSortOrder)) + (Nil, orderByExpr.getChildren.asScala.map(nodeToSortOrder)) case (None, None, Some(clusterByExpr)) => - val expressions = clusterByExpr.getChildren.map(nodeToExpr) + val expressions = clusterByExpr.getChildren.asScala.map(nodeToExpr) (expressions, expressions.map(SortOrder(_, Ascending))) case _ => throw new NotImplementedError( @@ -1639,7 +1657,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } rowFrame.orElse(rangeFrame).map { frame => - frame.getChildren.toList match { + frame.getChildren.asScala.toList match { case precedingNode :: followingNode :: Nil => SpecifiedWindowFrame( frameType, @@ -1701,7 +1719,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case other => sys.error(s"Non ASTNode encountered: $other") } - Option(node.getChildren).map(_.toList).getOrElse(Nil).foreach(dumpTree(_, builder, indent + 1)) + Option(node.getChildren).map(_.asScala).getOrElse(Nil).foreach(dumpTree(_, builder, indent + 1)) builder } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index 267074f3ad..004805f3ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -22,8 +22,7 @@ import java.rmi.server.UID import org.apache.avro.Schema -/* Implicit conversions */ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.ClassTag @@ -73,7 +72,7 @@ private[hive] object HiveShim { */ def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) { if (ids != null && ids.nonEmpty) { - ColumnProjectionUtils.appendReadColumns(conf, ids) + ColumnProjectionUtils.appendReadColumns(conf, ids.asJava) } if (names != null && names.nonEmpty) { appendReadColumnNames(conf, names) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index f49c97de8f..4d1e3ed919 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -21,7 +21,7 @@ import java.io.{File, PrintStream} import java.util.{Map => JMap} import javax.annotation.concurrent.GuardedBy -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.reflectiveCalls import org.apache.hadoop.fs.Path @@ -305,10 +305,11 @@ private[hive] class ClientWrapper( HiveTable( name = h.getTableName, specifiedDatabase = Option(h.getDbName), - schema = h.getCols.map(f => HiveColumn(f.getName, f.getType, f.getComment)), - partitionColumns = h.getPartCols.map(f => HiveColumn(f.getName, f.getType, f.getComment)), - properties = h.getParameters.toMap, - serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.toMap, + schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)), + partitionColumns = h.getPartCols.asScala.map(f => + HiveColumn(f.getName, f.getType, f.getComment)), + properties = h.getParameters.asScala.toMap, + serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap, tableType = h.getTableType match { case HTableType.MANAGED_TABLE => ManagedTable case HTableType.EXTERNAL_TABLE => ExternalTable @@ -334,9 +335,9 @@ private[hive] class ClientWrapper( private def toQlTable(table: HiveTable): metadata.Table = { val qlTable = new metadata.Table(table.database, table.name) - qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) + qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) qlTable.setPartCols( - table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) + table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) } table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) } @@ -366,13 +367,13 @@ private[hive] class ClientWrapper( private def toHivePartition(partition: metadata.Partition): HivePartition = { val apiPartition = partition.getTPartition HivePartition( - values = Option(apiPartition.getValues).map(_.toSeq).getOrElse(Seq.empty), + values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty), storage = HiveStorageDescriptor( location = apiPartition.getSd.getLocation, inputFormat = apiPartition.getSd.getInputFormat, outputFormat = apiPartition.getSd.getOutputFormat, serde = apiPartition.getSd.getSerdeInfo.getSerializationLib, - serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.toMap)) + serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) } override def getPartitionOption( @@ -397,7 +398,7 @@ private[hive] class ClientWrapper( } override def listTables(dbName: String): Seq[String] = withHiveState { - client.getAllTables(dbName) + client.getAllTables(dbName).asScala } /** @@ -514,17 +515,17 @@ private[hive] class ClientWrapper( } def reset(): Unit = withHiveState { - client.getAllTables("default").foreach { t => + client.getAllTables("default").asScala.foreach { t => logDebug(s"Deleting table $t") val table = client.getTable("default", t) - client.getIndexes("default", t, 255).foreach { index => + client.getIndexes("default", t, 255).asScala.foreach { index => shim.dropIndex(client, "default", t, index.getIndexName) } if (!table.isIndexTable) { client.dropTable("default", t) } } - client.getAllDatabases.filterNot(_ == "default").foreach { db => + client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db => logDebug(s"Dropping Database: $db") client.dropDatabase(db, true, false, true) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 8fc8935b1d..48bbb21e6c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -23,7 +23,7 @@ import java.net.URI import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet} import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf @@ -201,7 +201,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { setDataLocationMethod.invoke(table, new URI(loc)) override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = - getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq + getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq override def getPartitionsByFilter( hive: Hive, @@ -220,7 +220,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { override def getDriverResults(driver: Driver): Seq[String] = { val res = new JArrayList[String]() getDriverResultsMethod.invoke(driver, res) - res.toSeq + res.asScala } override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = { @@ -310,7 +310,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { setDataLocationMethod.invoke(table, new Path(loc)) override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = - getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq + getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq /** * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e. @@ -320,7 +320,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { */ def convertFilters(table: Table, filters: Seq[Expression]): String = { // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. - val varcharKeys = table.getPartitionKeys + val varcharKeys = table.getPartitionKeys.asScala .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME)) .map(col => col.getName).toSet @@ -354,7 +354,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] } - partitions.toSeq + partitions.asScala.toSeq } override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = @@ -363,7 +363,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { override def getDriverResults(driver: Driver): Seq[String] = { val res = new JArrayList[Object]() getDriverResultsMethod.invoke(driver, res) - res.map { r => + res.asScala.map { r => r match { case s: String => s case a: Array[Object] => a(0).asInstanceOf[String] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index 5f0ed5393d..441b6b6033 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.hive.metastore.api.FieldSchema @@ -39,8 +39,8 @@ case class DescribeHiveTableCommand( // Trying to mimic the format of Hive's output. But not exactly the same. var results: Seq[(String, String, String)] = Nil - val columns: Seq[FieldSchema] = table.hiveQlTable.getCols - val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols + val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala + val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala results ++= columns.map(field => (field.getName, field.getType, field.getComment)) if (partitionColumns.nonEmpty) { val partColumnInfo = @@ -48,7 +48,7 @@ case class DescribeHiveTableCommand( results ++= partColumnInfo ++ Seq(("# Partition Information", "", "")) ++ - Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++ + Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++ partColumnInfo } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index ba7eb15a1c..806d2b9b0b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} @@ -98,7 +98,7 @@ case class HiveTableScan( .asInstanceOf[StructObjectInspector] val columnTypeNames = structOI - .getAllStructFieldRefs + .getAllStructFieldRefs.asScala .map(_.getFieldObjectInspector) .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) .mkString(",") @@ -118,9 +118,8 @@ case class HiveTableScan( case None => partitions case Some(shouldKeep) => partitions.filter { part => val dataTypes = relation.partitionKeys.map(_.dataType) - val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield { - castFromString(value, dataType) - } + val castedValues = part.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => castFromString(value, dataType) } // Only partitioned values are needed here, since the predicate has already been bound to // partition key attribute references. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 62efda613a..58f7fa640e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.execution import java.util +import scala.collection.JavaConverters._ + import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer @@ -38,8 +39,6 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive._ import org.apache.spark.sql.types.DataType import org.apache.spark.{SparkException, TaskContext} - -import scala.collection.JavaConversions._ import org.apache.spark.util.SerializableJobConf private[hive] @@ -94,7 +93,8 @@ case class InsertIntoHiveTable( ObjectInspectorCopyOption.JAVA) .asInstanceOf[StructObjectInspector] - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val fieldOIs = standardOI.getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector).toArray val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} val outputData = new Array[Any](fieldOIs.length) @@ -198,7 +198,7 @@ case class InsertIntoHiveTable( // loadPartition call orders directories created on the iteration order of the this map val orderedPartitionSpec = new util.LinkedHashMap[String, String]() - table.hiveQlTable.getPartCols().foreach { entry => + table.hiveQlTable.getPartCols.asScala.foreach { entry => orderedPartitionSpec.put(entry.getName, partitionSpec.get(entry.getName).getOrElse("")) } @@ -226,7 +226,7 @@ case class InsertIntoHiveTable( val oldPart = catalog.client.getPartitionOption( catalog.client.getTable(table.databaseName, table.tableName), - partitionSpec) + partitionSpec.asJava) if (oldPart.isEmpty || !ifNotExists) { catalog.client.loadPartition( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index ade27454b9..c7651daffe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -21,7 +21,7 @@ import java.io._ import java.util.Properties import javax.annotation.Nullable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.hadoop.hive.serde.serdeConstants @@ -61,7 +61,7 @@ case class ScriptTransformation( protected override def doExecute(): RDD[InternalRow] = { def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = { val cmd = List("/bin/bash", "-c", script) - val builder = new ProcessBuilder(cmd) + val builder = new ProcessBuilder(cmd.asJava) val proc = builder.start() val inputStream = proc.getInputStream @@ -172,10 +172,10 @@ case class ScriptTransformation( val fieldList = outputSoi.getAllStructFieldRefs() var i = 0 while (i < dataList.size()) { - if (dataList(i) == null) { + if (dataList.get(i) == null) { mutableRow.setNullAt(i) } else { - mutableRow(i) = unwrap(dataList(i), fieldList(i).getFieldObjectInspector) + mutableRow(i) = unwrap(dataList.get(i), fieldList.get(i).getFieldObjectInspector) } i += 1 } @@ -307,7 +307,7 @@ case class HiveScriptIOSchema ( val serde = initSerDe(serdeClass, columns, columnTypes, inputSerdeProps) val fieldObjectInspectors = columnTypes.map(toInspector) val objectInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(columns, fieldObjectInspectors) + .getStandardStructObjectInspector(columns.asJava, fieldObjectInspectors.asJava) .asInstanceOf[ObjectInspector] (serde, objectInspector) } @@ -342,7 +342,7 @@ case class HiveScriptIOSchema ( propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames) val properties = new Properties() - properties.putAll(propsMap) + properties.putAll(propsMap.asJava) serde.initialize(null, properties) serde diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 7182246e46..cad02373e5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.Try import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ConstantObjectInspector} @@ -81,8 +81,7 @@ private[hive] class HiveFunctionRegistry(underlying: analysis.FunctionRegistry) /* List all of the registered function names. */ override def listFunction(): Seq[String] = { - val a = FunctionRegistry.getFunctionNames ++ underlying.listFunction() - a.toList.sorted + (FunctionRegistry.getFunctionNames.asScala ++ underlying.listFunction()).toList.sorted } /* Get the class of the registered function by specified name. */ @@ -116,7 +115,7 @@ private[hive] case class HiveSimpleUDF(funcWrapper: HiveFunctionWrapper, childre @transient private lazy val method = - function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo)) + function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) @transient private lazy val arguments = children.map(toInspector).toArray @@ -541,7 +540,7 @@ private[hive] case class HiveGenericUDTF( @transient protected lazy val collector = new UDTFCollector - lazy val elementTypes = outputInspector.getAllStructFieldRefs.map { + lazy val elementTypes = outputInspector.getAllStructFieldRefs.asScala.map { field => (inspectorToDataType(field.getFieldObjectInspector), true) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 9f4f8b5789..1cff5cf9c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.orc import java.util.Properties +import scala.collection.JavaConverters._ + import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -43,9 +45,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.util.SerializableConfiguration -/* Implicit conversions */ -import scala.collection.JavaConversions._ - private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { override def shortName(): String = "orc" @@ -97,7 +96,8 @@ private[orc] class OrcOutputWriter( private val reusableOutputBuffer = new Array[Any](dataSchema.length) // Used to convert Catalyst values into Hadoop `Writable`s. - private val wrappers = structOI.getAllStructFieldRefs.zip(dataSchema.fields.map(_.dataType)) + private val wrappers = structOI.getAllStructFieldRefs.asScala + .zip(dataSchema.fields.map(_.dataType)) .map { case (ref, dt) => wrapperFor(ref.getFieldObjectInspector, dt) }.toArray diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 4da86636ac..572eaebe81 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.test import java.io.File import java.util.{Set => JavaSet} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.implicitConversions @@ -37,9 +38,6 @@ import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.{SparkConf, SparkContext} -/* Implicit conversions */ -import scala.collection.JavaConversions._ - // SPARK-3729: Test key required to check for initialization errors with config. object TestHive extends TestHiveContext( @@ -405,7 +403,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { def reset() { try { // HACK: Hive is too noisy by default. - org.apache.log4j.LogManager.getCurrentLoggers.foreach { log => + org.apache.log4j.LogManager.getCurrentLoggers.asScala.foreach { log => log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN) } @@ -415,9 +413,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { catalog.client.reset() catalog.unregisterAllTables() - FunctionRegistry.getFunctionNames.filterNot(originalUDFs.contains(_)).foreach { udfName => - FunctionRegistry.unregisterTemporaryUDF(udfName) - } + FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). + foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } // Some tests corrupt this value on purpose, which breaks the RESET call below. hiveconf.set("fs.default.name", new File(".").toURI.toString) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala index 0efcf80bd4..5e7b93d457 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.client -import scala.collection.JavaConversions._ +import java.util.Collections import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.serde.serdeConstants @@ -38,7 +38,7 @@ class FiltersSuite extends SparkFunSuite with Logging { private val varCharCol = new FieldSchema() varCharCol.setName("varchar") varCharCol.setType(serdeConstants.VARCHAR_TYPE_NAME) - testTable.setPartCols(varCharCol :: Nil) + testTable.setPartCols(Collections.singletonList(varCharCol)) filterTest("string filter", (a("stringcol", StringType) > Literal("test")) :: Nil, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index b03a351323..9c10ffe111 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io.{DataInput, DataOutput} -import java.util -import java.util.Properties +import java.util.{ArrayList, Arrays, Properties} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFAverage, GenericUDF} @@ -33,8 +32,6 @@ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.util.Utils -import scala.collection.JavaConversions._ - case class Fields(f1: Int, f2: Int, f3: Int, f4: Int, f5: Int) // Case classes for the custom UDF's. @@ -326,11 +323,11 @@ class PairSerDe extends AbstractSerDe { override def getObjectInspector: ObjectInspector = { ObjectInspectorFactory .getStandardStructObjectInspector( - Seq("pair"), - Seq(ObjectInspectorFactory.getStandardStructObjectInspector( - Seq("id", "value"), - Seq(PrimitiveObjectInspectorFactory.javaIntObjectInspector, - PrimitiveObjectInspectorFactory.javaIntObjectInspector)) + Arrays.asList("pair"), + Arrays.asList(ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("id", "value"), + Arrays.asList(PrimitiveObjectInspectorFactory.javaIntObjectInspector, + PrimitiveObjectInspectorFactory.javaIntObjectInspector)) )) } @@ -343,10 +340,10 @@ class PairSerDe extends AbstractSerDe { override def deserialize(value: Writable): AnyRef = { val pair = value.asInstanceOf[TestPair] - val row = new util.ArrayList[util.ArrayList[AnyRef]] - row.add(new util.ArrayList[AnyRef](2)) - row(0).add(Integer.valueOf(pair.entry._1)) - row(0).add(Integer.valueOf(pair.entry._2)) + val row = new ArrayList[ArrayList[AnyRef]] + row.add(new ArrayList[AnyRef](2)) + row.get(0).add(Integer.valueOf(pair.entry._1)) + row.get(0).add(Integer.valueOf(pair.entry._2)) row } @@ -355,9 +352,9 @@ class PairSerDe extends AbstractSerDe { class PairUDF extends GenericUDF { override def initialize(p1: Array[ObjectInspector]): ObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector( - Seq("id", "value"), - Seq(PrimitiveObjectInspectorFactory.javaIntObjectInspector, - PrimitiveObjectInspectorFactory.javaIntObjectInspector) + Arrays.asList("id", "value"), + Arrays.asList(PrimitiveObjectInspectorFactory.javaIntObjectInspector, + PrimitiveObjectInspectorFactory.javaIntObjectInspector) ) override def evaluate(args: Array[DeferredObject]): AnyRef = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 3bf8f3ac20..210d566745 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.hive.execution +import scala.collection.JavaConverters._ + import org.scalatest.BeforeAndAfter import org.apache.spark.sql.hive.test.TestHive -/* Implicit conversions */ -import scala.collection.JavaConversions._ - /** * A set of test cases that validate partition and column pruning. */ @@ -161,7 +160,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch") assert(actualScannedColumns === expectedScannedColumns, "Scanned columns mismatch") - val actualPartitions = actualPartValues.map(_.toSeq.mkString(",")).sorted + val actualPartitions = actualPartValues.map(_.asScala.mkString(",")).sorted val expectedPartitions = expectedPartValues.map(_.mkString(",")).sorted assert(actualPartitions === expectedPartitions, "Partitions selected do not match") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 55ecbd5b5f..1ff1d9a293 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution import java.sql.{Date, Timestamp} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.DefaultParserDialect @@ -164,7 +164,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { test("show functions") { val allFunctions = (FunctionRegistry.builtin.listFunction().toSet[String] ++ - org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames).toList.sorted + org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames.asScala).toList.sorted checkAnswer(sql("SHOW functions"), allFunctions.map(Row(_))) checkAnswer(sql("SHOW functions abs"), Row("abs")) checkAnswer(sql("SHOW functions 'abs'"), Row("abs")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 5bbca14bad..7966b43596 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -17,9 +17,7 @@ package org.apache.spark.sql.sources -import java.sql.Date - -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -552,7 +550,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } @@ -600,7 +598,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) } } |