diff options
Diffstat (limited to 'examples/src/main/scala')
4 files changed, 81 insertions, 1 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 9a00701f98..71f53af68f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ + /* Need to create following keyspace and column family in cassandra before running this example Start CQL shell using ./bin/cqlsh and execute following commands diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index a8c338480e..4893b017ed 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ -import org.apache.spark.rdd.NewHadoopRDD + object HBaseTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala new file mode 100644 index 0000000000..29a65c7a5f --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import java.nio.ByteBuffer +import org.apache.cassandra.utils.ByteBufferUtil +import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap} + + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra + * output to a Map[String, Int] + */ +class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] { + override def convert(obj: Any): java.util.Map[String, Int] = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb))) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra + * output to a Map[String, String] + */ +class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] { + override def convert(obj: Any): java.util.Map[String, String] = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala new file mode 100644 index 0000000000..42ae960bd6 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.util.Bytes + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result + * to a String + */ +class HBaseConverter extends Converter[Any, String] { + override def convert(obj: Any): String = { + val result = obj.asInstanceOf[Result] + Bytes.toStringBinary(result.value()) + } +} |