Thursday, August 31, 2023

Change Record Field Names with ScriptedTransformRecord

 A question that comes up often in the Apache NiFi community is "How do I change all my record field names?" Some people want to upper- or lower-case all the field names, some want to normalize the names (to remove/replace characters that are invalid for the target system), etc.

There are some solutions described here but they are manual, meaning you have to know all the field names. It would be helpful to be able to programmatically change the field names based on some rule (uppercase, replace, e.g.).  We can use the ScriptedTransformRecord processor for this.

The following is a Groovy script that uppercases all field names, but I added a comment to the section that would do the work of creating a new field name, that section can be replaced with any code that calculates the desired field name:

import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
import org.apache.nifi.serialization.record.util.*
def recordSchema = record.schema
def recordFields = recordSchema.fields
def derivedFields = [] as List<RecordField>
def derivedValues = [:] as Map<String, Object>
recordFields.each {recordField -> 
    def originalFieldName = recordField.fieldName

    // Manipulate the field name(s) here
    def newFieldName = originalFieldName.toUpperCase()

    // Add the new RecordField, to be used in creating a new schema
    derivedFields.add(new RecordField(
newFieldName,
recordField.dataType,
recordField.defaultValue,
recordField.aliases,
recordField.nullable))

    // Add the original value to the new map using the new field name as the key
    derivedValues.put(newFieldName, record.getValue(recordField))
}
def derivedSchema = new SimpleRecordSchema(derivedFields)
return new MapRecord(derivedSchema, derivedValues)

This script iterates over the RecordFields from the original RecordSchema, then populates a new list of RecordFields (after the name has been updated) as well as the Map of field names to the original values. Then a new schema is created and used to create a new Record to return from the script.

As always I welcome all questions, comments, and issues. Cheers! 

Thursday, June 1, 2023

Generating Random Schemas for GenerateRecord

As of NiFi 1.20.0 (via NIFI-10585) there is a GenerateRecord processor, that can either take user-defined properties to define fields with values (populated by java-faker) or use an Avro Schema via the Schema Text property to fill in the records with random values based on the given datatypes in the schema. The Record Writer can be specified so even though the schema is an Avro Schema, it will write the records out in whatever format the writer supports.

For a small number of fields or for a predefined schema, GenerateRecord works well to randomly generate data and takes a small amount of time to configure. But if you're using GenerateRecord to try and generate random records with a large number of fields for testing purposes, you either have to add that many user-defined properties or design your own Avro Schema with that many fields. If you just care about generating random fields to test something downstream, you can use the following Groovy script to generate an Avro Schema file (.avsc) with the specified number of fields using random datatypes:

def numFields = 10
File file = new File("columns_${numFields}.avsc")
def primitiveDataTypes = ['string', 'int', 'float', 'double']
file.write  '''{
  "type" : "record",
  "name" : "NiFi_Record",
  "namespace" : "any.data",
  "fields" : ['''
 
fields = []
def r = new Random()
(1..numFields).each {
  fields << "\t{ \"name\": \"col_$it\", \"type\": \"${primitiveDataTypes[r.nextInt(primitiveDataTypes.size())]}\" }"
}
file << fields.join(',\n')
file << ''' ]
}
'''

The script above writes a valid Avro Schema to the file pointed at by file, and numFields specifies how many fields for each record. The datatype is randomly selected by a choice from primitiveDataTypes. One possible output (for the above script as-is):

{
  "type" : "record",
  "name" : "NiFi_Record",
  "namespace" : "any.data",
  "fields" : [ { "name": "col_1", "type": "float" },
{ "name": "col_2", "type": "float" },
{ "name": "col_3", "type": "int" },
{ "name": "col_4", "type": "string" },
{ "name": "col_5", "type": "int" },
{ "name": "col_6", "type": "double" },
{ "name": "col_7", "type": "string" },
{ "name": "col_8", "type": "string" },
{ "name": "col_9", "type": "string" },
{ "name": "col_10", "type": "string" } ]
}

For 10 fields, this probably isn't the way you want to go, as you can add 10 user-defined properties or come up with your own Avro Schema. But let's say you want to test your target DB for 5000 columns with GenerateRecord -> UpdateDatabaseTable -> PutDatabaseRecord? You can set numFields to 5000, run the script, then use the contents of the generated columns_5000.avsc file as your Schema Text property in GenerateRecord. You'll find that you get the number of records specified by GenerateRecord, each with random values corresponding to the datatypes generated by the Groovy script. Using UpdateDatabaseTable, it can create such a table if it doesn't exist, and with PutDatabaseRecord downstream it will put the records generated by GenerateRecord into the newly-created table.

I admit the use case for this is perhaps esoteric, but it's a fun use of NiFi and that's what this blog is all about :)

As always, I welcome all comments, questions, and suggestions. Cheers!



Thursday, April 27, 2023

Using JSLTTransformJSON (an alternative to JoltTransformJSON)

As of Apache NiFi 1.19.0, there is a processor called JSLTTransformJSON. This is similar in function to the JoltTransformJSON processor, but it allows you to use JSLT as the Domain-Specific Language (DSL) to transform one JSON to another. This processor was added because some users found JOLT too complex to learn, as it uses a "tree-walking" pattern to change the JSON structure versus a more programmatic DSL (From the docs, JSLT is based on jq, XPath, and XQuery). For an example of using JSLT, check out the demo playground which has a ready-made example.

For this post, I'll show a couple of examples including this following one that shows the Inception example from the JOLT playground, just as a comparison between the JOLT and JSLT expressions:

Input:

{
  "rating": {
    "primary": {
      "value": 3
    },
    "quality": {
      "value": 3
    }
  }
}

JOLT Transform:

[
  {
    "operation": "shift",
    "spec": {
      "rating": {
        "primary": {
          "value": "Rating"
        },
        "*": {
          "value": "SecondaryRatings.&1.Value",
          "$": "SecondaryRatings.&1.Id"
        }
      }
    }
  },
  {
    "operation": "default",
    "spec": {
      "Range": 5,
      "SecondaryRatings": {
        "*": {
          "Range": 5
        }
      }
    }
  }
]

Equivalent JSLT Transform:

{
 "Rating" : .rating.primary.value,
 "SecondaryRatings" : 
  [for (.rating) {
    .key : { 
        "Id": .key,
        "Value" : .value.value,
        "Range": 5
     }
   }
   if (.key != "primary")
  ],
  "Range": 5
}

These both kind of "walk the structure", but in my opinion the JSLT is more succinct and more readable for at least these reasons:

  • There is no need for a "default" spec like JOLT
  • There are fewer levels of JSON objects in the specification
  • You don't have to walk the structure one level at a time because you can refer to lower levels using the dot-notation  
  • You don't have to "count the levels" to use the & notation to walk back up the tree

Learning JSLT may not be easier than JOLT but you may find it easier if you are familiar with the tools JSLT is based on. At the end of the day they are both designed to modify the structure of input JSON, and just approach the DSL in different ways. And NiFi offers the choice between them :)

As always I welcome all comments, suggestions, and questions. Cheers!



Transform JSON String Field Into Record Object in NiFi

 One question that has come up often in the NiFi community is: "I have a field in my record(s) that is a string that actually contains JSON. Can I change that field to have the JSON object as the value?" It turns out the answer is yes! It is possible via ScriptedTransformRecord, and there are a couple of cool techniques I will show in this post where you can work with any format, not just JSON.

To start, I have a flow with a GenerateFlowFile (for test input) and a ScriptedTransformRecord for which its success relationship is sent to a funnel (just for illustration):


For GenerateFlowFile, I set the content to some test input with a field "myjson" which is a string containing JSON:

[
  {
    "id": 1,
    "myjson": "{\"a\":\"hello\",\"b\": 100}"
  },
  {
    "id": 2,
    "myjson": "{\"a\":\"world\",\"b\": 200}"
  }
]


You can see the "myjson" field is a string with a JSON object having two fields "a" and "b". With ScriptedTransformRecord I will need to look for that record field by name and parse it into a JSON object, which is really easy with Groovy. This is the entire script, with further explanation below:

import org.apache.nifi.serialization.record.*
import org.apache.nifi.serialization.record.util.*
import java.nio.charset.*
import groovy.json.*

def recordMap = record.toMap()
def derivedValues = new HashMap(recordMap)
def slurper = new JsonSlurper()
recordMap.each {k,v -> 
    if('myjson'.equals(k)) {
derivedValues.put(k, slurper.parseText(v))
    }
}
def inferredSchema = DataTypeUtils.inferSchema(derivedValues, null, StandardCharsets.UTF_8)
return new MapRecord(inferredSchema, derivedValues)

The first thing the script does (after importing the necessary packages) is to get the existing record fields as a java Map. Then a copy of the map is made so we can change the value of the "myjson" field. The record map is iterated over, giving each key/value pair where the key is the record field name and the value is the record field value. I only want to change the "myjson" field, and I replace the String value with the parsed Java Object. This is possible because the maps are of type <String, Object>.

However, I don't know the new schema of the overall record because "myjson" is no longer a String but a sub-record. I used DataTypeUtils (with a null record name as it's the root record) and its inferSchema() method to get the schema of the new output record, then I return a new record (instead of the original) using that schema and the updated map. Using a JsonRecordSetWriter in ScriptedTransformRecord gives the following output for the above input FlowFile:

[ {
  "id" : 1,
  "myjson" : {
    "a" : "hello",
    "b" : 100
  }
}, {
  "id" : 2,
  "myjson" : {
    "a" : "world",
    "b" : 200
  }
} ]

The field parsing technique (JsonSlurper in this case) can be extended to other things as well, basically any string that can be converted into a Map. Groovy has an XmlSlurper so the content can be XML and not JSON for example.

The schema inference technique can be used in many cases, not just for replacing a single field. It only takes the root-level Map, name of null, and a character set (UTF-8 in this post) and produces a new schema for the outgoing record. You can even produce multiple output records for a single input record but you'll want them both to have the same schema so the entire overall FlowFile has the same schema for all records.

As always, I welcome all comments, questions, and suggestions. Cheers!




Wednesday, January 11, 2023

cURLing the secure Apache NiFi REST API

While testing a change to a NiFi REST API endpoint, I found it would be easy to test with a simple cURL command. However the default security of NiFi uses single-user authentication, meaning you need to provide a username and password to get a token, then add that token as a header to subsequent REST API calls. To make that easier, I whipped up a quick bash script to do exactly this:

#!/usr/bin/env bash
if [ $# -lt 3 ]
  then
    echo "Usage: ncurl <username> <password> <URL>"
    exit 1
fi
token=$(curl -k https://localhost:8443/nifi-api/access/token -d "username=$1&password=$2")
curl -H "Authorization: Bearer $token" -k $3

As you can see from the usage screen, "ncurl" expects the username, password, and desired URL as arguments to the script. Here is an example:

ncurl admin mypassword https://localhost:8443/nifi-api/flow/metrics/prometheus

It is possible to reuse the token (for single-user authentication the token is good for 8 hours for example), so you could use the "token" line to get a token into a global variable, then the "curl" line to call multiple URLs. This same approach works for other authentication schemes such as LDAP and Kerberos.

Thanks to Dave Handermann for talking these concepts with me, and as always I welcome all questions, comments, suggestions, and issues. Cheers!