From d4c7572dba1be49e55ceb38713652e5bcf485be8 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 20 Jun 2014 17:16:56 -0700 Subject: Move ScriptTransformation into the appropriate place. Author: Reynold Xin Closes #1162 from rxin/script and squashes the following commits: 2c836b9 [Reynold Xin] Move ScriptTransformation into the appropriate place. --- .../spark/sql/hive/ScriptTransformation.scala | 80 ---------------------- .../sql/hive/execution/ScriptTransformation.scala | 80 ++++++++++++++++++++++ 2 files changed, 80 insertions(+), 80 deletions(-) delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala deleted file mode 100644 index 8258ee5fef..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import java.io.{BufferedReader, InputStreamReader} - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.hive.HiveContext - -/* Implicit conversions */ -import scala.collection.JavaConversions._ - -/** - * :: DeveloperApi :: - * Transforms the input by forking and running the specified script. - * - * @param input the set of expression that should be passed to the script. - * @param script the command that should be executed. - * @param output the attributes that are produced by the script. - */ -@DeveloperApi -case class ScriptTransformation( - input: Seq[Expression], - script: String, - output: Seq[Attribute], - child: SparkPlan)(@transient sc: HiveContext) - extends UnaryNode { - - override def otherCopyArgs = sc :: Nil - - def execute() = { - child.execute().mapPartitions { iter => - val cmd = List("/bin/bash", "-c", script) - val builder = new ProcessBuilder(cmd) - val proc = builder.start() - val inputStream = proc.getInputStream - val outputStream = proc.getOutputStream - val reader = new BufferedReader(new InputStreamReader(inputStream)) - - // TODO: This should be exposed as an iterator instead of reading in all the data at once. - val outputLines = collection.mutable.ArrayBuffer[Row]() - val readerThread = new Thread("Transform OutputReader") { - override def run() { - var curLine = reader.readLine() - while (curLine != null) { - // TODO: Use SerDe - outputLines += new GenericRow(curLine.split("\t").asInstanceOf[Array[Any]]) - curLine = reader.readLine() - } - } - } - readerThread.start() - val outputProjection = new Projection(input) - iter - .map(outputProjection) - // TODO: Use SerDe - .map(_.mkString("", "\t", "\n").getBytes("utf-8")).foreach(outputStream.write) - outputStream.close() - readerThread.join() - outputLines.toIterator - } - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala new file mode 100644 index 0000000000..8258ee5fef --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.{BufferedReader, InputStreamReader} + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.hive.HiveContext + +/* Implicit conversions */ +import scala.collection.JavaConversions._ + +/** + * :: DeveloperApi :: + * Transforms the input by forking and running the specified script. + * + * @param input the set of expression that should be passed to the script. + * @param script the command that should be executed. + * @param output the attributes that are produced by the script. + */ +@DeveloperApi +case class ScriptTransformation( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan)(@transient sc: HiveContext) + extends UnaryNode { + + override def otherCopyArgs = sc :: Nil + + def execute() = { + child.execute().mapPartitions { iter => + val cmd = List("/bin/bash", "-c", script) + val builder = new ProcessBuilder(cmd) + val proc = builder.start() + val inputStream = proc.getInputStream + val outputStream = proc.getOutputStream + val reader = new BufferedReader(new InputStreamReader(inputStream)) + + // TODO: This should be exposed as an iterator instead of reading in all the data at once. + val outputLines = collection.mutable.ArrayBuffer[Row]() + val readerThread = new Thread("Transform OutputReader") { + override def run() { + var curLine = reader.readLine() + while (curLine != null) { + // TODO: Use SerDe + outputLines += new GenericRow(curLine.split("\t").asInstanceOf[Array[Any]]) + curLine = reader.readLine() + } + } + } + readerThread.start() + val outputProjection = new Projection(input) + iter + .map(outputProjection) + // TODO: Use SerDe + .map(_.mkString("", "\t", "\n").getBytes("utf-8")).foreach(outputStream.write) + outputStream.close() + readerThread.join() + outputLines.toIterator + } + } +} -- cgit v1.2.3