Building a Machine Learning Application With Cloudera Data Science Workbench And Operational Database, Part 2: Querying/ Loading Data

Building a Machine Learning Application With Cloudera Data Science Workbench And Operational Database, Part 2: Querying/ Loading Data

In this installment, we’ll discuss how to do Get/Scan Operations and utilize PySpark SQL. Afterward, we’ll talk about Bulk Operations and then some troubleshooting errors you may come across while trying this yourself. Read the first blog here.

Get/Scan Operations

  • Using Catalogs

In this example, let’s load the table ‘tblEmployee’ that we made in the “Put Operations” in Part 1. I used the same exact catalog in order to load the table. 

from pyspark.sql import SparkSession

spark = SparkSession \
 .builder \
 .appName("SampleApplication") \
 .getOrCreate()

tableCatalog = ''.join("""{
              "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
              "rowkey":"key",
              "columns":{
                "key":{"cf":"rowkey", "col":"key", "type":"int"},
                "empId":{"cf":"personal","col":"empId","type":"string"},
                "empName":{"cf":"personal", "col":"empName", "type":"string"},
                "empState":{"cf":"personal", "col":"empState", "type":"string"}
              }
            }""".split())

table = spark.read.format("org.apache.hadoop.hbase.spark") \
 .options(catalog=tableCatalog) \
 .option("hbase.spark.use.hbasecontext", False) \
 .load()

table.show()

Executing table.show() will give you:

In addition, you can edit the catalog where you can omit some columns that you don’t need. For example, if you only needed “key” and “empName” columns of the ‘tblEmployee’ table, then you can create a catalog below. If you replace the catalog in the example above with this one, table.show() will show you a PySpark Dataframe with only those two columns.

 

tableCatalog = ''.join("""{
                "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
                "rowkey":"key",
                "columns":{
                  "key":{"cf":"rowkey", "col":"key", "type":"int"},
                  "empName":{"cf":"personal", "col":"empName", "type":"string"}
                }
              }""".split())

Executing table.show() will give you:


There’s a limited amount of filtering you can do with catalogs itself, and the best way to perform get and scan operations is through PySpark SQL which will be discussed later.

  • Using hbase.columns.mapping

Similarly, we can use hbase.columns.mapping to load a HBase Table into a PySpark Dataframe. Let’s try to load ‘tblEmployee’ using this method

from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("SampleApplication") \
   .getOrCreate()

df = spark.read.format("org.apache.hadoop.hbase.spark") \
   .option("hbase.columns.mapping",
           "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \
   .option("hbase.table", "tblEmployee") \
   .option("hbase.spark.use.hbasecontext", False) \
   .load()

df.show()

Executing df.show() will give you:

Spark SQL with PySpark

Using PySpark SQL is the easiest and best way to perform HBase read operations in Python. With PySpark SQL, a temporary table can be created which will directly run SQL queries on the HBase table. To do this though, we need to create a view on a PySpark Dataframe loaded from HBase. Let’s start with the data frame we loaded in our ‘hbase.column.mappings’ example above. This code snippet shows how to define the view and run a query on it. 

df.createOrReplaceTempView("personView")
result = spark.sql("SELECT * FROM personView") # SQL Query
result.show()

Executing result.show() will give you:

One of the biggest advantages of using a view is that queries will reflect updated data from the HBase table, so df doesn’t have to be redefined and reloaded every time to get updated values. Views are essentially for use-cases that depend on the most updated data from HBase.

If you do a read operation and show the results without using a View, the results don’t automatically get updated, and you should load() again in order to get up-to-date results.

Below is an example demonstrating this. First, 2 rows are added to an HBase table and the table is loaded in a PySpark DataFrame and shown in the workbench. Then we write 2 more rows and run the query again and the workbench will show all 4 rows. 

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession \
 .builder \
 .appName("PySparkSQLExample") \
 .getOrCreate()

# Catalog

tableCatalog = ''.join("""{
              "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
              "rowkey":"key",
              "columns":{
                "key":{"cf":"rowkey", "col":"key", "type":"int"},
                "empId":{"cf":"personal","col":"empId","type":"string"},
                "empName":{"cf":"personal", "col":"empName", "type":"string"},
                "empState":{"cf":"personal", "col":"empState", "type":"string"}
              }
            }""".split())

# Adding first 2 rows

employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
 .options(catalog=tableCatalog, newTable=5) \
 .option("hbase.spark.use.hbasecontext", False) \
 .save()

df = spark.read.format("org.apache.hadoop.hbase.spark") \
 .options(catalog=tableCatalog) \
 .option("hbase.spark.use.hbasecontext", False) \
 .load()

df.createOrReplaceTempView("sampleView")
result = spark.sql("SELECT * FROM sampleView")

print("The PySpark DataFrame with only the first 2 rows")
result.show()

# Adding 2 more rows

employee = [(11, 'bobG', 'Bob Graham', 'TX'), (12, 'manasC', 'Manas Chakka', 'GA')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
 .options(catalog=tableCatalog, newTable=5) \
 .option("hbase.spark.use.hbasecontext", False) \
 .save()
# Notice here I didn't reload "df" before doing result.show() again
print("The PySpark Dataframe immediately after writing 2 more rows")
result.show()

Here’s the output from this code example:

Bulk Operations

While using PySpark you may reach performance limitations that could be mitigated by parallelizing operations. HBase allows for this through Bulk Operations and is supported for Spark programs written in Scala and Java. For more information on those operations using Scala or Java look at this link https://hbase.apache.org/book.html#_basic_spark.

However, support for these operations are limited in PySpark. Creating a HBase Configuration and Java HBase Context Object is possible through accessing the JVM. Here’s an example below showing how to create these objects.

Currently, there are open issues filed for bulk operation support through these Java Objects.

https://issues.apache.org/jira/browse/HBASE-24829

Troubleshooting

— Python Version in Worker Node Not The Same as Driver

Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions

This error can occur if environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are not set or incorrect. Refer to configuration steps above and make sure you have Python installed on every node on your cluster as well as the environment variables set properly to the right path.

— Py4J Errors

AttributeError: ‘SparkContext’ object has no attribute ‘_get_object_id’

This error can appear when trying to access certain Java/Scala objects explicitly through the JVM, i.e “sparkContext._jvm”. There are JIRAs filed for fixing issues like these, but refer to the supported methods mentioned in this post to access HBase tables

https://issues.apache.org/jira/browse/HBASE-24828

— Could not find datasource “org.apache.hbase.spark”

java.lang.ClassNotFoundException: Failed to find data source: org.apache.hadoop.hbase.spark. Please find packages at http://spark.apache.org/third-party-projects.html

This error appears if the jars aren’t visible to Spark driver and executor. Make sure the right jars have been made available to the runtime depending on the deployment chosen (CDSW vs spark-shell/submit).

Conclusion

PySpark is now available to transform and access data from HBase. For those only comfortable working with Python, the methods mentioned here and in Using PySpark and Apache HBase, Part 1 will allow you to easily work with PySpark and HBase. 

Check out these links to get started with a CDP DH cluster and try these examples yourself in CDSW: Cloudera Data Hub Cloudera Data Science Workbench (CDSW)As part of more advanced usage with PySpark, click here for Part 3 to see how a PySpark model can be built, scored, and served with HBase Data.

Manas Chakka
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.