Log Reduction Techniques with CFM

Cloudera services logs offer a breadth of information to assist in cluster maintenance; from assisting in security checks, auditing tasks, and validation for performance tuning and testing tasks – to name a few. 

However, log records generated by these services do not hold the same value for every organisation. For example Cyber teams may find more value in logs that outline user behaviour when accessing the data, whilst operational teams may prefer logs that show the spikes in load time throughout the day.

From this, a question arises: how do we facilitate this need to filter logs to only that which a particular team or user would find useful?

This is where Cloudera Flow Management (CFM) can be utilised. Whilst there are third party  products out there that can filter logs at an additional cost to an already existing CDP cluster, the adoption of CFM can mitigate this financial burden for businesses that already have CFM  part of the Cloudera CDP subscription.

To tackle this problem, there are two different avenues that can be taken.

  1. Converting flows to records and filtering using SQL
  2. Extracting variables and filtering using NiFi expression language

The overview of both processes are as follows

Method Description
SQL filtering on Records Logs are ingested and converted to records, These records are then searched upon using SQL.

Benefits:

  • Preferred method for high performances
  • Good for high volume ingestion

Cons:

  • Harder to implement with schema definition
NiFi Expression filtering on extracted Attributes Incoming logs have attributes extracted from them, NiFi expression language is then used to filter these records

Benefits:

  • Easy to implement
  • Good for low volume ingestion

Cons:

  • Less efficient compared to the record method
  • More processors

We will expand both workflows in the following sections.

Assumptions

The workflow that will be discussed assumes the use of a CDP 7.1.x cluster that has both Kerberos and TLS enabled. Additionally, both Ranger and Apache Solr are assumed to be installed, as the two main services chosen to leverage for log ingestion in the NiFi canvas.

However, the principals of the workflow can be adopted for a cluster without these configurations.

What does this Workflow Look Like?

Record Based

Converting log ingestion flowfiles to records is a quick and efficient method to filter logs via CFM. However it is not always the easiest to implement, depending on the format of the logs that are ingested.

The high-level overview of this workflow is as follows:

High level workflow overview

Attribute-Based

The main concept behind attribute-based filtering is to have processors that will extract attributes of interest and route on a specified condition. Anything else is up to whoever is implementing the template on how complex or simplistic they desire the workflow to be.

The high-level overview of this workflow is as follows:

What are the Processors Needed to Implement this Workflow for a Cloudera Cluster?

Record Based

Ingestion

Using a remote process group, we will gather all processors required to hook into the service we are ingesting from, in this case, Solr, and extract only the services we want. Please note that grabbing logs from Solr is not the only solution, to completely reduce the amount of logs a cluster will hold, MiNiFi agents or Apache NiFi can grab the logs directly from their source. This example solution will outline the rerouting/filtering of data, but an overall reduction is possible via this workflow.

Choosing which processors to use and which services to ingest out logs from will act as the first level of filtering; services that we do not want will not have a dedicated processor for it.

These processors will be dedicated to specific services that you would want to target, e.g. HDFS, YARN, Apache Kafka, Hive, Impala, etc.

In this workflow example, the GetSolr processor is used, where the services are filtered by using the Solr Query field

Record Conversion

The first major step in this process is to convert incoming log records into records. For this example, the incoming Solr records are ingested as XML files and are converted into json records.

For reference, the incoming Solr log flowfile for this example is as follows:

<?xml version="1.0" encoding="UTF-8"?>
<docs>
   <doc>
   <field name="field_one">some_value</field>
   <field name="field_two">some_value</field>
                <field name="field_three">some_value</field>
   <docs>
   <doc>
   <doc>
   <field name="field_one">some_value</field>
   <field name="field_two">some_value</field>
                <field name="field_three">some_value</field>
   <docs>
<docs>  

Note that there are two ways to tackle this step, the first method is to convert it in a separate processor. The second is to convert it in the next filtering records processor, which essentially simplifies this process down. However not all XML files can be converted straight away.

For the Solr example, conversion straight away is not possible so an additional processor is required. This conversion is done using a JoltTransformRecord processor.

In this processor, there are two main functionalities. The first is to convert the record from XML to JSON. This is done by utilising a reader and writer controller. For this example, the XML Reader and JSONRecordSetWriter controllers have been adopted.

Because of the way Solr generates its logs, a schema similar to the following is adopted

{
    "name": "docs",
    "namespace": "doc",
    "type": "record",
    "fields": [ {
     "name" : "field",
     "type" : {
     "type" : "array",
     "items" : {
     "type" : "record",
     "name" : "record_tag",
     "namespace" : "name",
     "fields" : [
     {"name": "name", "type": "string"},
     {"name": "value", "type": "string"}] } }  } ]
}

With the field, Field Name for Content set to value.

For this example, an Avro schema has been adopted for the workflow, however this is subject to change depending on the format of the ingested logs.

Additionally, it is recommended to implement the use of a schema registry controller within the reader service for complex flows which utilise multiple schema’s. In this example an AvroSchemaRegistry controller is adopted, however external schema repositories are available for connection as well.

Once the record is converted, we then use the Jolt Specification field to adjust the json record to what is required to allow for easy querying. In this example, we use the following specifications

[{
"operation": "shift",
"spec": {
   "field": {
     "*": {
       "value": "@(1,name)"} }
}
  }]

With the field, Jolt Transformation DSL set to chain.

The output of the Solr Record in JSON format will now take a format similar to the following:

[ {
"field_one":"some_value",
"field_two":"some_value",
"field_three":"some_value"
},
{
"field_one":"some_value",
"field_two":"some_value",
"field_three":"some_value"
} ]

The output record can now be filtered upon in the next stage.

Filtering Records

This can be accomplished using the QueryRecord processor. If the ingested log files are in a good format, the conversion can also occur in this processor, whereby the controllers to convert the logs to records are called first in this step.

If not, the records can be ingested and written out as the same format. The filtering in this stage comes in the form of queries using SQL on the incoming records. A general format of such a query is as follows

SELECT *
FROM FLOWFILE
WHERE field = ‘something’

These queries are added to the processor as a new field. The name of this field will be the eventual name of the connector that will be used to route the newly filtered logs into their next stage. 

Merging and Saving

This is an optional step, once the logs have been filtered, what occurs to them after depends on the use case of this workflow’s deployment. Outlined here is a simplistic approach of simply merging the incoming flow files using the MergeRecord processor, and then saving to disk

If required, the incoming flow files can be merged using the Correlation Attribute Name field, which helps group like records together. 

The same principle is used when saving to disk (in this case, HDFS). The attribute used to group the flowfiles, is the the same attribute used to dictate where on disk a group of records should be saved

Attribute-Based

Ingestion

The same process as found in the record based method above, this still acts as the first level of filtering for this method.

Record Split

We want to evaluate based on an event, not a group of events, so a processor to split incoming records is needed. In this working example, the ingestion processors provide the records in XML format so a SplitXml processor is adopted, however other processors for non-XML logs can be adopted as needed.

Attribute Extraction

A processor is needed to extract any variables that will be used in the filtering statement in the next stage. For this processor, we have adopted the EvaluateXPath processor.

These variables that are extracted will be attached as metadata to the outgoing flowfile, so only extract what is needed. The variables that are needed are dependent on the next stage.

Filtering on User Conditions

Take the attributes extracted above, and filters on a given condition. These two processors are the main components required in the workflow and represent the second level of filtering that this workflow provides. If the first level stops unwanted services, this stage stops unwanted events.

It’s best to only extract what is needed.

All conditions are laid out in NiFi expression language, examples of such can be as follows:

 Example Conditions:

  • Any record that is from a query engine (e.g. Hive, Impala, etc), only IF the record is a command run by a user
  • Any record that is from HDFS, only IF the record is by a particular service, and the event represents a write action
  • Any records from Kafka, only IF the record represents users publishing to certain Kafka topics
  • Any records from YARN, only IF the record if for a user submitting an application through Hive

These conditions (and more) can be laid out in NiFi expression language. Any incoming records that match a given condition can then be routed to the next stage. Any records that do not meet the requirements of any of the listed conditions can be dropped from the process, considered as “noise” by the workflow.

Merging and Saving

This is an optional step, and follows the same principles outlined in the corresponding Record based method above. However in this case, the MergeContent processor will be used instead.

Where to go from here

Based on your own implementation, whether it be record based, attribute based, or a combination of the two, you’ll need to add processors where needed, remove the ones you do not need, and tune the dataflow to match the requirements of your business. These foundational steps should get you started in building your own pipeline to help enrich the value of your log data.

Many of Cloudera’s customers have broadened this solution to cover a range of enterprise applications logs, significantly reducing log volumes and often enriching what remains to ensure what remains is meaningful, and actionable. 

If this solution is one that you or your organisation would like to adopt, and would like to further discuss more options on how to filter your logs, Cloudera Professional Services have experienced consultants that would love to help you optimize your business today. 

Reach out to your Cloudera account team, or if you are a new user, contact us here.

Danielle Mathews
Solutions Consultant
More by this author
Pierre Villard
Director, Product Management - Data in Motion
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.