New in Cloudera Labs: SparkOnHBase

Categories: Cloudera Labs HBase Spark

As we progressively move from MapReduce to Spark, we shouldn’t have to give up good HBase integration. Hence the newest Cloudera Labs project, SparkOnHBase!

[Ed. Note: In Aug. 2015, SparkOnHBase was committed to the Apache HBase trunk in the form of a new HBase-Spark module.]

Apache Spark is making a huge impact across our industry, changing the way we think about batch processing and stream processing. However, as we progressively migrate from MapReduce toward Spark, we shouldn’t have to “give up” anything. One of those capabilities we need to retain is the ability to interact with Apache HBase.

In this post, we will share the work being done in Cloudera Labs to make integrating Spark and HBase super-easy in the form of the SparkOnHBase project. (As with everything else in Cloudera Labs, SparkOnHBase is not supported and there is no timetable for possible support in the future; it’s for experimentation only.) You’ll learn common patterns of HBase integration with Spark and see Scala and Java examples for each. (It may be helpful to have the SparkOnHBase repository open as you read along.)

HBase and Batch Processing Patterns

Before we get into the coolness of Spark, let’s define some powerful usage patterns around HBase interactions with batch processing. This discussion is necessary because when I talk to many customers that are new to HBase, they tell me that they hear HBase and MapReduce should never be used together.

In fact, although there are valid use cases to have a HBase cluster that is isolated from MapReduce for low SLA reasons, there are also use cases where the combination of MapReduce and HBase is the right approach. Here are just a couple examples:

  • Massive operations on a tree/DAG/graph structures stored in HBase
  • Interaction with a store or table that is in constant change, with MapReduce or Impala

SparkOnHBase Design

We experimented with many designs for how Spark and HBase integration should work and ended up focusing on a few goals:

  • Make HBase connections seamless.
  • Make Kerberos integration seamless.
  • Create RDDs through Scan actions or from an existing RDD which are used to generate Get commands.
  • Take any RDD and allow any combination of HBase operations to be done.
  • Provide simple methods for common operations while allowing unrestricted, unknown advanced operation through the API.
  • Support Scala and Java.
  • Support Spark and Spark Streaming with a like API.

These goals led us to a design that took a couple of notes from the GraphX API in Spark. For example, in SparkOnHBase there is an object called HBaseContext. This class has a constructor that takes HBase configuration information and then once constructed, allows you to do a bunch of operations on it. For example, you can:

  • Create RDD/DStream from a Scan
  • Put/Delete the contents of a RDD/DStream into HBase
  • Create a RDD/DStream from gets created from the contents of a RDD/DStream
  • Take the contents of a RDD/DStream and do any operation if a HConnection was handed to you in the worker process

Let’s walk through a code example so you can an idea about how easy and powerful this API can be. First, we create a RDD, connect to HBase, and put the contents of that RDD into HBase.

Now every partition of that RDD will execute in parallel (in different threads in a number of Spark workers across the cluster)—kind of like what would have happened if we did Puts in a mapper or reducer task.

One thing to note is that the same rules apply when working with HBase from MapReduce or Spark in terms of Put and Get performance. If you have Puts that are not partitioned, a Put batch will most likely get sent to each RegionServer, which will result in fewer records per RegionServers per batch. The image below illustrates how this would look with six RegionServers; imagine if you had 100 of them (it would be 16.7x worse)!

Now let’s look at that same diagram if we used Spark to partition first before talking to HBase.

Examples

Next, we’ll quickly explore just three code examples to illustrate how you can do different types of operations. (A Put example would look almost exactly like a delete, checkPut, checkDelete, or increment example.)

The big difference in a get example would be the fact that we are producing a new RDD from an existing one. Think of it as a “Spark map function.”

Now, let’s say your interaction with HBase is more complex than straight gets or Puts—a case were you want to say, “Just give me an HConnection and leave me alone.” Well, HBaseContext has map, mapPartition, foreach, andforeachPartition methods just for you.

Here’s an example of the foreachPartition in Java.

The last example to talk about will be the create a RDD from a scan:

This code will execute a scan just like MapReduce would do with the table input format and populate the resulting RDD with records of type (RowKey, List[(columnFamily, columnQualifier, Value)]. If you don’t like that record type, then just use the hbaseRDD method, which gives you a record conversion function for changing it to whatever you like.

Conclusion

SparkOnHBase has been tested on a number of clusters with Spark and Spark Streaming; give it a look and let us know your feedback via the Cloudera Labs discussion group. The hope is that this project and others like it will help us blend the goodness from different Hadoop ecosystem components to help solve bigger problems.

To use SparkOnHBase, just add the following snippet as a dependency in your pom.xml:

Acknowledgements

Special thanks to the people that helped me make SparkOnHBase: Tathagata Das (TD), Mark Grover, Michael Stack, Sandy Ryza, Kevin O’Dell, Jean-Marc Spaggiari, Matteo Bertozzi, and Jeff Lord.

Ted Malaska is a Solutions Architect at Cloudera, a contributor to Apache Spark, and a co-author of the O’Reilly book, Hadoop Applications Architecture.

facebooktwittergoogle_pluslinkedinmailfacebooktwittergoogle_pluslinkedinmail

14 responses on “New in Cloudera Labs: SparkOnHBase

  1. Cristofer

    Now that Spark SQL have support for external data sources with predicate pushdown it will be nice to see some integrations in this direction too.

  2. Does SparkOnHBase support Dstream

    The first parameter of bulkPut method is a RDD,When I use this method in spark streaming I set a DStream an the parameter;It won’t work;How to solve this problem?

    1. Justin Kestelyn (@kestelyn) Post author

      Please post this question in the Cloudera Labs area at community.cloudera.com for easier interaction…

  3. Manju Jain

    When I try to execute the JavaHBaseStreamingBulkPutExample by reading the stream of Data from the socket it does not save anything to HBase
    I try to print the Dstream its getting printed properly
    please help as this is really not working for Us

    1. Justin Kestelyn (@kestelyn) Post author

      Could you please post this issue in the Cloudera Labs area at community.cloudera.com?

  4. Developer

    I am using CDH 5.3, to use JavaHBaseContext ,
    what are the maven pom dependencies ?

    Thanks !

  5. Vinay

    We are using sparkOnHBase lib to do streamBulkPut() for a RDD in “spark-streaming with checkpointing”
    and getting the following error while recovering from a checkpoint

    =================================== ======================================
    16/01/22 01:32:35 ERROR executor.Executor: Exception in task 0.0 in stage 39.0 (TID 134)
    java.lang.ClassCastException: [B cannot be cast to org.apache.spark.SerializableWritable
    at com.cloudera.spark.hbase.HBaseContext.applyCreds(HBaseContext.scala:225)
    at com.cloudera.spark.hbase.HBaseContext.com$cloudera$spark$hbase$HBaseContext$$hbaseForeachPartition(HBaseContext.scala:633)
    at com.cloudera.spark.hbase.HBaseContext$$anonfun$com$cloudera$spark$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:460)
    at com.cloudera.spark.hbase.HBaseContext$$anonfun$com$cloudera$spark$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:460)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

  6. alchemist

    I am trying to use the above API in Java and trying to get the JavaHBaseContext using CDH5.5 libraries. Somehow I cannot get JavaHBaseContext library.
    Cannot find this library
    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
    Improted following maven script

    org.apache.hbase
    hbase-spark
    2.0.0-SNAPSHOT

  7. Abi

    You have provided Java based SparkHbase sample code at line “Here’s an example of the foreachPartition” but maven dependencies and what parameter used while creating jsc obj is missing. Can you please let us know link where you have this example end to end ?
    Thanks,
    Abi

  8. Sathish

    I am trying to use this library in my spark scala application, but finding difficulty in compiling the code,
    I added below dependency in pom.xml file, and added repo – https://repository.cloudera.com/artifactory/cloudera-repos/

    com.cloudera
    spark-hbase
    0.0.2-clabs

    I do see this depenency got resolved by checking in mvn dependency:tree,
    com.cloudera:spark-hbase:jar:0.0.2-clabs:compile

    However, compilation fails with below error,
    error: object cloudera is not a member of package com.sathish.app,
    [INFO] import com.cloudera.spark.hbase.HBaseContext

    Any clue on the root cause ? I tried in sbt project, and got the same problem .

    1. kart

      Code Issues 5 Pull requests 6 Pulse
      src/main/scala/com/cloudera/spark/hbase/HBaseContext.scala

      /*
      * Licensed to the Apache Software Foundation (ASF) under one or more
      * contributor license agreements. See the NOTICE file distributed with
      * this work for additional information regarding copyright ownership.
      * The ASF licenses this file to You under the Apache License, Version 2.0
      * (the “License”); you may not use this file except in compliance with
      * the License. You may obtain a copy of the License at
      *
      * http://www.apache.org/licenses/LICENSE-2.0
      *
      * Unless required by applicable law or agreed to in writing, software
      * distributed under the License is distributed on an “AS IS” BASIS,
      * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */

      package com.cloudera.spark.hbase

      import org.apache.spark.broadcast.Broadcast
      import org.apache.spark.deploy.SparkHadoopUtil
      import org.apache.spark.rdd.RDD
      import org.apache.hadoop.conf.Configuration
      import org.apache.hadoop.hbase.client.HConnectionManager
      import org.apache.hadoop.hbase.client.Scan
      import org.apache.hadoop.hbase.client.Get
      import java.util.ArrayList
      import org.apache.hadoop.hbase.client.Result
      import scala.reflect.ClassTag
      import org.apache.hadoop.hbase.client.HConnection
      import org.apache.hadoop.hbase.client.Put
      import org.apache.hadoop.hbase.client.Increment
      import org.apache.hadoop.hbase.client.Delete
      import org.apache.spark.{Logging, SerializableWritable, SparkConf, SparkContext}
      import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable
      import org.apache.hadoop.mapreduce.Job
      import org.apache.hadoop.hbase.client.Mutation
      import org.apache.spark.streaming.dstream.DStream
      import java.io._
      import org.apache.hadoop.security.{Credentials, UserGroupInformation}
      import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
      import org.apache.hadoop.hbase.mapreduce.TableInputFormat
      import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
      import org.apache.hadoop.fs.{Path, FileSystem}

      /**
      * HBaseContext is a façade of simple and complex HBase operations
      * like bulk put, get, increment, delete, and scan
      *
      * HBase Context will take the responsibilities to happen to
      * complexity of disseminating the configuration information
      * to the working and managing the life cycle of HConnections.
      *
      * serializable Configuration object
      *
      */
      class HBaseContext(@transient sc: SparkContext,
      @transient config: Configuration,
      val tmpHdfsConfgFile: String = null) extends Serializable with Logging {

      @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
      @transient var tmpHdfsConfiguration:Configuration = config
      @transient var appliedCredentials = false;
      @transient val job = new Job(config)
      TableMapReduceUtil.initCredentials(job)
      val broadcastedConf = sc.broadcast(new SerializableWritable(config))
      val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials()))

      if (tmpHdfsConfgFile != null && config != null) {
      val fs = FileSystem.newInstance(config)
      val tmpPath = new Path(tmpHdfsConfgFile)
      if (!fs.exists(tmpPath)) {
      val outputStream = fs.create(tmpPath)
      config.write(outputStream)
      outputStream.close();
      } else {
      logWarning(“tmpHdfsConfigDir ” + tmpHdfsConfgFile + ” exist!!”)
      }
      }

      /**
      * A simple enrichment of the traditional Spark RDD foreachPartition.
      * This function differs from the original in that it offers the
      * developer access to a already connected HConnection object
      *
      * Note: Do not close the HConnection object. All HConnection
      * management is handled outside this method
      *
      * @param rdd Original RDD with data to iterate over
      * @param f Function to be given a iterator to iterate through
      * the RDD values and a HConnection object to interact
      * with HBase
      */
      def foreachPartition[T](rdd: RDD[T],
      f: (Iterator[T], HConnection) => Unit) = {
      rdd.foreachPartition(
      it => hbaseForeachPartition(broadcastedConf, it, f))
      }

      /**
      * A simple enrichment of the traditional Spark Streaming dStream foreach
      * This function differs from the original in that it offers the
      * developer access to a already connected HConnection object
      *
      * Note: Do not close the HConnection object. All HConnection
      * management is handled outside this method
      *
      * @param dstream Original DStream with data to iterate over
      * @param f Function to be given a iterator to iterate through
      * the DStream values and a HConnection object to
      * interact with HBase
      */
      def foreachRDD[T](dstream: DStream[T],
      f: (Iterator[T], HConnection) => Unit) = {
      dstream.foreach((rdd, time) => {
      foreachPartition(rdd, f)
      })
      }

      /**
      * A simple enrichment of the traditional Spark RDD mapPartition.
      * This function differs from the original in that it offers the
      * developer access to a already connected HConnection object
      *
      * Note: Do not close the HConnection object. All HConnection
      * management is handled outside this method
      *
      * Note: Make sure to partition correctly to avoid memory issue when
      * getting data from HBase
      *
      * @param rdd Original RDD with data to iterate over
      * @param mp Function to be given a iterator to iterate through
      * the RDD values and a HConnection object to interact
      * with HBase
      * @return Returns a new RDD generated by the user definition
      * function just like normal mapPartition
      */
      def mapPartition[T, R: ClassTag](rdd: RDD[T],
      mp: (Iterator[T], HConnection) => Iterator[R]): RDD[R] = {

      rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
      it,
      mp), true)
      }

      /**
      * A simple enrichment of the traditional Spark Streaming DStream
      * mapPartition.
      *
      * This function differs from the original in that it offers the
      * developer access to a already connected HConnection object
      *
      * Note: Do not close the HConnection object. All HConnection
      * management is handled outside this method
      *
      * Note: Make sure to partition correctly to avoid memory issue when
      * getting data from HBase
      *
      * @param dstream Original DStream with data to iterate over
      * @param mp Function to be given a iterator to iterate through
      * the DStream values and a HConnection object to
      * interact with HBase
      * @return Returns a new DStream generated by the user
      * definition function just like normal mapPartition
      */
      def streamMap[T, U: ClassTag](dstream: DStream[T],
      mp: (Iterator[T], HConnection) => Iterator[U]): DStream[U] = {

      dstream.mapPartitions(it => hbaseMapPartition[T, U](
      broadcastedConf,
      it,
      mp), true)
      }

      /**
      * A simple abstraction over the HBaseContext.foreachPartition method.
      *
      * It allow addition support for a user to take RDD
      * and generate puts and send them to HBase.
      * The complexity of managing the HConnection is
      * removed from the developer
      *
      * @param rdd Original RDD with data to iterate over
      * @param tableName The name of the table to put into
      * @param f Function to convert a value in the RDD to a HBase Put
      * @param autoFlush If autoFlush should be turned on
      */
      def bulkPut[T](rdd: RDD[T], tableName: String, f: (T) => Put, autoFlush: Boolean) {

      rdd.foreachPartition(
      it => hbaseForeachPartition[T](
      broadcastedConf,
      it,
      (iterator, hConnection) => {
      val htable = hConnection.getTable(tableName)
      htable.setAutoFlush(autoFlush, true)
      iterator.foreach(T => htable.put(f(T)))
      htable.flushCommits()
      htable.close()
      }))
      }

      def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){

      credentials = SparkHadoopUtil.get.getCurrentUserCredentials()

      logInfo(“appliedCredentials:” + appliedCredentials + “,credentials:” + credentials);

      if (appliedCredentials == false && credentials != null) {
      appliedCredentials = true
      logCredInformation(credentials)

      @transient val ugi = UserGroupInformation.getCurrentUser();
      ugi.addCredentials(credentials)
      // specify that this is a proxy user
      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)

      ugi.addCredentials(credentialsConf.value.value)
      }
      }

      def logCredInformation[T] (credentials2:Credentials) {
      logInfo(“credentials:” + credentials2);
      for (a Put,
      autoFlush: Boolean) = {
      dstream.foreach((rdd, time) => {
      bulkPut(rdd, tableName, f, autoFlush)
      })
      }

      /**
      * A simple abstraction over the HBaseContext.foreachPartition method.
      *
      * It allow addition support for a user to take RDD
      * and generate checkAndPuts and send them to HBase.
      * The complexity of managing the HConnection is
      * removed from the developer
      *
      * @param rdd Original RDD with data to iterate over
      * @param tableName The name of the table to put into
      * @param f Function to convert a value in the RDD to
      * a HBase checkAndPut
      * @param autoFlush If autoFlush should be turned on
      */
      def bulkCheckAndPut[T](rdd: RDD[T], tableName: String, f: (T) => (Array[Byte], Array[Byte], Array[Byte], Array[Byte], Put), autoFlush: Boolean) {
      rdd.foreachPartition(
      it => hbaseForeachPartition[T](
      broadcastedConf,
      it,
      (iterator, hConnection) => {

      val htable = hConnection.getTable(tableName)
      htable.setAutoFlush(autoFlush, true)

      iterator.foreach(T => {
      val checkPut = f(T)
      htable.checkAndPut(checkPut._1, checkPut._2, checkPut._3, checkPut._4, checkPut._5)
      })
      htable.flushCommits()
      htable.close()
      }))
      }

      /**
      * A simple abstraction over the HBaseContext.streamMapPartition method.
      *
      * It allow addition support for a user to take a DStream and
      * generate checkAndPuts and send them to HBase.
      *
      * The complexity of managing the HConnection is
      * removed from the developer
      *
      * @param dstream Original DStream with data to iterate over
      * @param tableName The name of the table to checkAndPut into
      * @param f function to convert a value in the RDD to
      * a HBase checkAndPut
      * @param autoFlush If autoFlush should be turned on
      */
      def streamBulkCheckAndPut[T](dstream: DStream[T], tableName: String, f: (T) => (Array[Byte], Array[Byte], Array[Byte], Array[Byte], Put), autoFlush: Boolean) {
      dstream.foreach((rdd, time) => {
      bulkCheckAndPut(rdd, tableName, f, autoFlush)
      })
      }

      /**
      * A simple abstraction over the HBaseContext.foreachPartition method.
      *
      * It allow addition support for a user to take a RDD and
      * generate increments and send them to HBase.
      *
      * The complexity of managing the HConnection is
      * removed from the developer
      *
      * @param rdd Original RDD with data to iterate over
      * @param tableName The name of the table to increment to
      * @param f function to convert a value in the RDD to a
      * HBase Increments
      * @param batchSize The number of increments to batch before sending to HBase
      */
      def bulkIncrement[T](rdd: RDD[T], tableName: String, f: (T) => Increment, batchSize: Integer) {
      bulkMutation(rdd, tableName, f, batchSize)
      }

      /**
      * A simple abstraction over the HBaseContext.foreachPartition method.
      *
      * It allow addition support for a user to take a RDD and generate delete
      * and send them to HBase. The complexity of managing the HConnection is
      * removed from the developer
      *
      * @param rdd Original RDD with data to iterate over
      * @param tableName The name of the table to delete from
      * @param f Function to convert a value in the RDD to a
      * HBase Deletes
      * @param batchSize The number of delete to batch before sending to HBase
      */
      def bulkDelete[T](rdd: RDD[T], tableName: String, f: (T) => Delete, batchSize: Integer) {
      bulkMutation(rdd, tableName, f, batchSize)
      }

      /**
      * A simple abstraction over the HBaseContext.foreachPartition method.
      *
      * It allow addition support for a user to take a RDD and generate
      * checkAndDelete and send them to HBase. The complexity of managing the
      * HConnection is removed from the developer
      *
      * @param rdd Original RDD with data to iterate over
      * @param tableName The name of the table to delete from
      * @param f Function to convert a value in the RDD to a
      * HBase Deletes
      */
      def bulkCheckDelete[T](rdd: RDD[T],
      tableName: String,
      f: (T) => (Array[Byte], Array[Byte], Array[Byte], Array[Byte], Delete)) {
      rdd.foreachPartition(
      it => hbaseForeachPartition[T](
      broadcastedConf,
      it,
      (iterator, hConnection) => {
      val htable = hConnection.getTable(tableName)

      iterator.foreach(T => {
      val checkDelete = f(T)
      htable.checkAndDelete(checkDelete._1, checkDelete._2, checkDelete._3, checkDelete._4, checkDelete._5)
      })
      htable.flushCommits()
      htable.close()
      }))
      }

      /**
      * A simple abstraction over the HBaseContext.streamBulkMutation method.
      *
      * It allow addition support for a user to take a DStream and
      * generate Increments and send them to HBase.
      *
      * The complexity of managing the HConnection is
      * removed from the developer
      *
      * @param dstream Original DStream with data to iterate over
      * @param tableName The name of the table to increments into
      * @param f Function to convert a value in the DStream to a
      * HBase Increments
      * @param batchSize The number of increments to batch before sending to HBase
      */
      def streamBulkIncrement[T](dstream: DStream[T],
      tableName: String,
      f: (T) => Increment,
      batchSize: Int) = {
      streamBulkMutation(dstream, tableName, f, batchSize)
      }

      /**
      * A simple abstraction over the HBaseContext.streamBulkMutation method.
      *
      * It allow addition support for a user to take a DStream and
      * generate Delete and send them to HBase.
      *
      * The complexity of managing the HConnection is
      * removed from the developer
      *
      * @param dstream Original DStream with data to iterate over
      * @param tableName The name of the table to delete from
      * @param f function to convert a value in the DStream to a
      * HBase Delete
      * @param batchSize The number of deletes to batch before sending to HBase
      */
      def streamBulkDelete[T](dstream: DStream[T],
      tableName: String,
      f: (T) => Delete,
      batchSize: Integer) = {
      streamBulkMutation(dstream, tableName, f, batchSize)
      }

      /**
      * A simple abstraction over the bulkCheckDelete method.
      *
      * It allow addition support for a user to take a DStream and
      * generate CheckAndDelete and send them to HBase.
      *
      * The complexity of managing the HConnection is
      * removed from the developer
      *
      * @param dstream Original DStream with data to iterate over
      * @param tableName The name of the table to delete from
      * @param f function to convert a value in the DStream to a
      * HBase Delete
      */
      def streamBulkCheckAndDelete[T](dstream: DStream[T],
      tableName: String,
      f: (T) => (Array[Byte], Array[Byte], Array[Byte], Array[Byte], Delete)) {
      dstream.foreach((rdd, time) => {
      bulkCheckDelete(rdd, tableName, f)
      })
      }

      /**
      * Under lining function to support all bulk mutations
      *
      * May be opened up if requested
      */
      private def bulkMutation[T](rdd: RDD[T], tableName: String, f: (T) => Mutation, batchSize: Integer) {
      rdd.foreachPartition(
      it => hbaseForeachPartition[T](
      broadcastedConf,
      it,
      (iterator, hConnection) => {
      val htable = hConnection.getTable(tableName)
      val mutationList = new ArrayList[Mutation]
      iterator.foreach(T => {
      mutationList.add(f(T))
      if (mutationList.size >= batchSize) {
      htable.batch(mutationList)
      mutationList.clear()
      }
      })
      if (mutationList.size() > 0) {
      htable.batch(mutationList)
      mutationList.clear()
      }
      htable.close()
      }))
      }

      /**
      * Under lining function to support all bulk streaming mutations
      *
      * May be opened up if requested
      */
      private def streamBulkMutation[T](dstream: DStream[T],
      tableName: String,
      f: (T) => Mutation,
      batchSize: Integer) = {
      dstream.foreach((rdd, time) => {
      bulkMutation(rdd, tableName, f, batchSize)
      })
      }

      /**
      * A simple abstraction over the HBaseContext.mapPartition method.
      *
      * It allow addition support for a user to take a RDD and generates a
      * new RDD based on Gets and the results they bring back from HBase
      *
      * @param rdd Original RDD with data to iterate over
      * @param tableName The name of the table to get from
      * @param makeGet function to convert a value in the RDD to a
      * HBase Get
      * @param convertResult This will convert the HBase Result object to
      * what ever the user wants to put in the resulting
      * RDD
      * return new RDD that is created by the Get to HBase
      */
      def bulkGet[T, U](tableName: String,
      batchSize: Integer,
      rdd: RDD[T],
      makeGet: (T) => Get,
      convertResult: (Result) => U): RDD[U] = {

      val getMapPartition = new GetMapPartition(tableName,
      batchSize,
      makeGet,
      convertResult)

      rdd.mapPartitions[U](it =>
      hbaseMapPartition[T, U](
      broadcastedConf,
      it,
      getMapPartition.run), true)(fakeClassTag[U])
      }

      /**
      * A simple abstraction over the HBaseContext.streamMap method.
      *
      * It allow addition support for a user to take a DStream and
      * generates a new DStream based on Gets and the results
      * they bring back from HBase
      *
      * @param dstream Original DStream with data to iterate over
      * @param tableName The name of the table to get from
      * @param makeGet function to convert a value in the DStream to a
      * HBase Get
      * @param convertResult This will convert the HBase Result object to
      * what ever the user wants to put in the resulting
      * DStream
      * return new DStream that is created by the Get to HBase
      */
      def streamBulkGet[T, U: ClassTag](tableName: String,
      batchSize: Integer,
      dstream: DStream[T],
      makeGet: (T) => Get,
      convertResult: (Result) => U): DStream[U] = {

      val getMapPartition = new GetMapPartition(tableName,
      batchSize,
      makeGet,
      convertResult)

      dstream.mapPartitions[U](it => hbaseMapPartition[T, U](
      broadcastedConf,
      it,
      getMapPartition.run), true)
      }

      /**
      * This function will use the native HBase TableInputFormat with the
      * given scan object to generate a new RDD
      *
      * @param tableName the name of the table to scan
      * @param scan the HBase scan object to use to read data from HBase
      * @param f function to convert a Result object from HBase into
      * what the user wants in the final generated RDD
      * @return new RDD with results from scan
      */
      def hbaseRDD[U: ClassTag](tableName: String, scan: Scan, f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {

      var job: Job = new Job(getConf(broadcastedConf))

      TableMapReduceUtil.initCredentials(job)
      TableMapReduceUtil.initTableMapperJob(tableName, scan, classOf[IdentityTableMapper], null, null, job)

      sc.newAPIHadoopRDD(job.getConfiguration(),
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]).map(f)
      }

      /**
      * A overloaded version of HBaseContext hbaseRDD that predefines the
      * type of the outputing RDD
      *
      * @param tableName the name of the table to scan
      * @param scans the HBase scan object to use to read data from HBase
      * @return New RDD with results from scan
      *
      */
      def hbaseRDD(tableName: String, scans: Scan):
      RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {

      hbaseRDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])](
      tableName,
      scans,
      (r: (ImmutableBytesWritable, Result)) => {
      val it = r._2.list().iterator()
      val list = new ArrayList[(Array[Byte], Array[Byte], Array[Byte])]()

      while (it.hasNext()) {
      val kv = it.next()
      list.add((kv.getFamily(), kv.getQualifier(), kv.getValue()))
      }

      (r._1.copyBytes(), list)
      })
      }

      def hbaseScanRDD(tableName: String, scan: Scan):
      RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {

      new HBaseScanRDD(sc, tableName, scan,
      broadcastedConf)
      }

      /**
      * Under lining wrapper all foreach functions in HBaseContext
      *
      */
      private def hbaseForeachPartition[T](
      configBroadcast: Broadcast[SerializableWritable[Configuration]],
      it: Iterator[T],
      f: (Iterator[T], HConnection) => Unit) = {

      val config = getConf(configBroadcast)

      applyCreds(configBroadcast)
      // specify that this is a proxy user
      val hConnection = HConnectionManager.createConnection(config)
      f(it, hConnection)
      hConnection.close()

      }

      private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = {

      if (tmpHdfsConfiguration != null) {
      tmpHdfsConfiguration
      } else if (tmpHdfsConfgFile != null) {

      val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)

      val inputStream = fs.open(new Path(tmpHdfsConfgFile))
      tmpHdfsConfiguration = new Configuration(false)
      tmpHdfsConfiguration.readFields(inputStream)
      inputStream.close()

      tmpHdfsConfiguration
      }

      if (tmpHdfsConfiguration == null) {
      try {
      tmpHdfsConfiguration = configBroadcast.value.value
      tmpHdfsConfiguration
      } catch {
      case ex: Exception =>{
      println(“Unable to getConfig from broadcast”)
      }
      }
      }

      tmpHdfsConfiguration
      }

      /**
      * Under lining wrapper all mapPartition functions in HBaseContext
      *
      */
      private def hbaseMapPartition[K, U](
      configBroadcast: Broadcast[SerializableWritable[Configuration]],
      it: Iterator[K],
      mp: (Iterator[K], HConnection) => Iterator[U]): Iterator[U] = {

      val config = getConf(configBroadcast)
      applyCreds(configBroadcast)
      val hConnection = HConnectionManager.createConnection(config)

      val res = mp(it, hConnection)
      hConnection.close()
      res

      }

      /**
      * Under lining wrapper all get mapPartition functions in HBaseContext
      */
      private class GetMapPartition[T, U](tableName: String,
      batchSize: Integer,
      makeGet: (T) => Get,
      convertResult: (Result) => U) extends Serializable {

      def run(iterator: Iterator[T], hConnection: HConnection): Iterator[U] = {
      val htable = hConnection.getTable(tableName)

      val gets = new ArrayList[Get]()
      var res = List[U]()

      while (iterator.hasNext) {
      gets.add(makeGet(iterator.next))

      if (gets.size() == batchSize) {
      var results = htable.get(gets)
      res = res ++ results.map(convertResult)
      gets.clear()
      }
      }
      if (gets.size() > 0) {
      val results = htable.get(gets)
      res = res ++ results.map(convertResult)
      gets.clear()
      }
      htable.close()
      res.iterator
      }
      }

      /**
      * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
      *
      * This method is used to keep ClassTags out of the external Java API, as the Java compiler
      * cannot produce them automatically. While this ClassTag-faking does please the compiler,
      * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
      *
      * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, just worse performance
      * or security issues. For instance, an Array[AnyRef] can hold any type T, but may lose primitive
      * specialization.
      */
      private[spark]
      def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
      }
      Desktop version