Thursday, August 31, 2023

Change Record Field Names with ScriptedTransformRecord

 A question that comes up often in the Apache NiFi community is "How do I change all my record field names?" Some people want to upper- or lower-case all the field names, some want to normalize the names (to remove/replace characters that are invalid for the target system), etc.

There are some solutions described here but they are manual, meaning you have to know all the field names. It would be helpful to be able to programmatically change the field names based on some rule (uppercase, replace, e.g.).  We can use the ScriptedTransformRecord processor for this.

The following is a Groovy script that uppercases all field names, but I added a comment to the section that would do the work of creating a new field name, that section can be replaced with any code that calculates the desired field name:

import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
import org.apache.nifi.serialization.record.util.*
def recordSchema = record.schema
def recordFields = recordSchema.fields
def derivedFields = [] as List<RecordField>
def derivedValues = [:] as Map<String, Object>
recordFields.each {recordField -> 
    def originalFieldName = recordField.fieldName

    // Manipulate the field name(s) here
    def newFieldName = originalFieldName.toUpperCase()

    // Add the new RecordField, to be used in creating a new schema
    derivedFields.add(new RecordField(
newFieldName,
recordField.dataType,
recordField.defaultValue,
recordField.aliases,
recordField.nullable))

    // Add the original value to the new map using the new field name as the key
    derivedValues.put(newFieldName, record.getValue(recordField))
}
def derivedSchema = new SimpleRecordSchema(derivedFields)
return new MapRecord(derivedSchema, derivedValues)

This script iterates over the RecordFields from the original RecordSchema, then populates a new list of RecordFields (after the name has been updated) as well as the Map of field names to the original values. Then a new schema is created and used to create a new Record to return from the script.

As always I welcome all questions, comments, and issues. Cheers! 

Thursday, June 1, 2023

Generating Random Schemas for GenerateRecord

As of NiFi 1.20.0 (via NIFI-10585) there is a GenerateRecord processor, that can either take user-defined properties to define fields with values (populated by java-faker) or use an Avro Schema via the Schema Text property to fill in the records with random values based on the given datatypes in the schema. The Record Writer can be specified so even though the schema is an Avro Schema, it will write the records out in whatever format the writer supports.

For a small number of fields or for a predefined schema, GenerateRecord works well to randomly generate data and takes a small amount of time to configure. But if you're using GenerateRecord to try and generate random records with a large number of fields for testing purposes, you either have to add that many user-defined properties or design your own Avro Schema with that many fields. If you just care about generating random fields to test something downstream, you can use the following Groovy script to generate an Avro Schema file (.avsc) with the specified number of fields using random datatypes:

def numFields = 10
File file = new File("columns_${numFields}.avsc")
def primitiveDataTypes = ['string', 'int', 'float', 'double']
file.write  '''{
  "type" : "record",
  "name" : "NiFi_Record",
  "namespace" : "any.data",
  "fields" : ['''
 
fields = []
def r = new Random()
(1..numFields).each {
  fields << "\t{ \"name\": \"col_$it\", \"type\": \"${primitiveDataTypes[r.nextInt(primitiveDataTypes.size())]}\" }"
}
file << fields.join(',\n')
file << ''' ]
}
'''

The script above writes a valid Avro Schema to the file pointed at by file, and numFields specifies how many fields for each record. The datatype is randomly selected by a choice from primitiveDataTypes. One possible output (for the above script as-is):

{
  "type" : "record",
  "name" : "NiFi_Record",
  "namespace" : "any.data",
  "fields" : [ { "name": "col_1", "type": "float" },
{ "name": "col_2", "type": "float" },
{ "name": "col_3", "type": "int" },
{ "name": "col_4", "type": "string" },
{ "name": "col_5", "type": "int" },
{ "name": "col_6", "type": "double" },
{ "name": "col_7", "type": "string" },
{ "name": "col_8", "type": "string" },
{ "name": "col_9", "type": "string" },
{ "name": "col_10", "type": "string" } ]
}

For 10 fields, this probably isn't the way you want to go, as you can add 10 user-defined properties or come up with your own Avro Schema. But let's say you want to test your target DB for 5000 columns with GenerateRecord -> UpdateDatabaseTable -> PutDatabaseRecord? You can set numFields to 5000, run the script, then use the contents of the generated columns_5000.avsc file as your Schema Text property in GenerateRecord. You'll find that you get the number of records specified by GenerateRecord, each with random values corresponding to the datatypes generated by the Groovy script. Using UpdateDatabaseTable, it can create such a table if it doesn't exist, and with PutDatabaseRecord downstream it will put the records generated by GenerateRecord into the newly-created table.

I admit the use case for this is perhaps esoteric, but it's a fun use of NiFi and that's what this blog is all about :)

As always, I welcome all comments, questions, and suggestions. Cheers!



Thursday, April 27, 2023

Using JSLTTransformJSON (an alternative to JoltTransformJSON)

As of Apache NiFi 1.19.0, there is a processor called JSLTTransformJSON. This is similar in function to the JoltTransformJSON processor, but it allows you to use JSLT as the Domain-Specific Language (DSL) to transform one JSON to another. This processor was added because some users found JOLT too complex to learn, as it uses a "tree-walking" pattern to change the JSON structure versus a more programmatic DSL (From the docs, JSLT is based on jq, XPath, and XQuery). For an example of using JSLT, check out the demo playground which has a ready-made example.

For this post, I'll show a couple of examples including this following one that shows the Inception example from the JOLT playground, just as a comparison between the JOLT and JSLT expressions:

Input:

{
  "rating": {
    "primary": {
      "value": 3
    },
    "quality": {
      "value": 3
    }
  }
}

JOLT Transform:

[
  {
    "operation": "shift",
    "spec": {
      "rating": {
        "primary": {
          "value": "Rating"
        },
        "*": {
          "value": "SecondaryRatings.&1.Value",
          "$": "SecondaryRatings.&1.Id"
        }
      }
    }
  },
  {
    "operation": "default",
    "spec": {
      "Range": 5,
      "SecondaryRatings": {
        "*": {
          "Range": 5
        }
      }
    }
  }
]

Equivalent JSLT Transform:

{
 "Rating" : .rating.primary.value,
 "SecondaryRatings" : 
  [for (.rating) {
    .key : { 
        "Id": .key,
        "Value" : .value.value,
        "Range": 5
     }
   }
   if (.key != "primary")
  ],
  "Range": 5
}

These both kind of "walk the structure", but in my opinion the JSLT is more succinct and more readable for at least these reasons:

  • There is no need for a "default" spec like JOLT
  • There are fewer levels of JSON objects in the specification
  • You don't have to walk the structure one level at a time because you can refer to lower levels using the dot-notation  
  • You don't have to "count the levels" to use the & notation to walk back up the tree

Learning JSLT may not be easier than JOLT but you may find it easier if you are familiar with the tools JSLT is based on. At the end of the day they are both designed to modify the structure of input JSON, and just approach the DSL in different ways. And NiFi offers the choice between them :)

As always I welcome all comments, suggestions, and questions. Cheers!



Transform JSON String Field Into Record Object in NiFi

 One question that has come up often in the NiFi community is: "I have a field in my record(s) that is a string that actually contains JSON. Can I change that field to have the JSON object as the value?" It turns out the answer is yes! It is possible via ScriptedTransformRecord, and there are a couple of cool techniques I will show in this post where you can work with any format, not just JSON.

To start, I have a flow with a GenerateFlowFile (for test input) and a ScriptedTransformRecord for which its success relationship is sent to a funnel (just for illustration):


For GenerateFlowFile, I set the content to some test input with a field "myjson" which is a string containing JSON:

[
  {
    "id": 1,
    "myjson": "{\"a\":\"hello\",\"b\": 100}"
  },
  {
    "id": 2,
    "myjson": "{\"a\":\"world\",\"b\": 200}"
  }
]


You can see the "myjson" field is a string with a JSON object having two fields "a" and "b". With ScriptedTransformRecord I will need to look for that record field by name and parse it into a JSON object, which is really easy with Groovy. This is the entire script, with further explanation below:

import org.apache.nifi.serialization.record.*
import org.apache.nifi.serialization.record.util.*
import java.nio.charset.*
import groovy.json.*

def recordMap = record.toMap()
def derivedValues = new HashMap(recordMap)
def slurper = new JsonSlurper()
recordMap.each {k,v -> 
    if('myjson'.equals(k)) {
derivedValues.put(k, slurper.parseText(v))
    }
}
def inferredSchema = DataTypeUtils.inferSchema(derivedValues, null, StandardCharsets.UTF_8)
return new MapRecord(inferredSchema, derivedValues)

The first thing the script does (after importing the necessary packages) is to get the existing record fields as a java Map. Then a copy of the map is made so we can change the value of the "myjson" field. The record map is iterated over, giving each key/value pair where the key is the record field name and the value is the record field value. I only want to change the "myjson" field, and I replace the String value with the parsed Java Object. This is possible because the maps are of type <String, Object>.

However, I don't know the new schema of the overall record because "myjson" is no longer a String but a sub-record. I used DataTypeUtils (with a null record name as it's the root record) and its inferSchema() method to get the schema of the new output record, then I return a new record (instead of the original) using that schema and the updated map. Using a JsonRecordSetWriter in ScriptedTransformRecord gives the following output for the above input FlowFile:

[ {
  "id" : 1,
  "myjson" : {
    "a" : "hello",
    "b" : 100
  }
}, {
  "id" : 2,
  "myjson" : {
    "a" : "world",
    "b" : 200
  }
} ]

The field parsing technique (JsonSlurper in this case) can be extended to other things as well, basically any string that can be converted into a Map. Groovy has an XmlSlurper so the content can be XML and not JSON for example.

The schema inference technique can be used in many cases, not just for replacing a single field. It only takes the root-level Map, name of null, and a character set (UTF-8 in this post) and produces a new schema for the outgoing record. You can even produce multiple output records for a single input record but you'll want them both to have the same schema so the entire overall FlowFile has the same schema for all records.

As always, I welcome all comments, questions, and suggestions. Cheers!




Wednesday, January 11, 2023

cURLing the secure Apache NiFi REST API

While testing a change to a NiFi REST API endpoint, I found it would be easy to test with a simple cURL command. However the default security of NiFi uses single-user authentication, meaning you need to provide a username and password to get a token, then add that token as a header to subsequent REST API calls. To make that easier, I whipped up a quick bash script to do exactly this:

#!/usr/bin/env bash
if [ $# -lt 3 ]
  then
    echo "Usage: ncurl <username> <password> <URL>"
    exit 1
fi
token=$(curl -k https://localhost:8443/nifi-api/access/token -d "username=$1&password=$2")
curl -H "Authorization: Bearer $token" -k $3

As you can see from the usage screen, "ncurl" expects the username, password, and desired URL as arguments to the script. Here is an example:

ncurl admin mypassword https://localhost:8443/nifi-api/flow/metrics/prometheus

It is possible to reuse the token (for single-user authentication the token is good for 8 hours for example), so you could use the "token" line to get a token into a global variable, then the "curl" line to call multiple URLs. This same approach works for other authentication schemes such as LDAP and Kerberos.

Thanks to Dave Handermann for talking these concepts with me, and as always I welcome all questions, comments, suggestions, and issues. Cheers!

Tuesday, December 6, 2022

Using UpdateDatabaseRecord for Schema Migration and Database Replication

 In NiFi version 1.19.0, the UpdateDatabaseTable processor was added to help with database schema migration and to make replication of database tables easier. This processor checks the schema of the incoming records against the target database. As in the following example, if the target table does not exist, the processor can either fail the FlowFile or create the table for you:


Having the processor create the table (if it doesn't already exist) allows you to take data from any source/flow and land it in a database even if that table is not already there. This removes the previous need to set up the target tables with the intended schema manually and beforehand. I will illustrate this with a simple flow:



The flow uses GenerateFlowFile to issue some simple sample data, but in practice it will be common to use QueryDatabaseTableRecord to incrementally fetch rows from a source database table. The flow then sends the FlowFiles to UpdateDatabaseTable, but any number of transformations of the data are possible in NiFi before the records are ready for the desired target table. This is the configuration of UpdateDatabaseTable for this example:


It should be noted here (and will be noted later) that the outgoing FlowFiles from the UpdateDatabaseTable processor will have a 'output.table' attribute set to the target table name. This can (and will in this example) be used later to refer to the table that was created/updated.

When the flow is started, the target table 'udt_test' does not yet exist in the database:



Taking a look at an example FlowFile, you can see the schema of the data (i.e. the JSON structure) and thus the intended target table definition:



After starting this flow and seeing at least one FlowFile progress through UpdateDatabaseTable successfully, we can now verify the desired target table does indeed exist in the target database:



And after running with the following configuration of PutDatabaseRecord:



We can see that not only has the table been created (using the aforementioned 'output.table' attribute), but the record has been inserted successfully:



The example above is meant to demonstrate that you can take data (in the form of NiFi records in a FlowFile) from any source, transform it however you like, and put it into a database table without the table having to exist beforehand. But it can do more than that! If you have an existing table and an extra field in your input data that does not have an associated column in the target table, UpdateDatabaseTable will add the missing column and start populating rows using all the available columns in the input data. Using the above example, let's say we added an extra field named 'newField' with a string value:



Then we run that FlowFile through UpdateDatabaseTable, and see that the table schema has been updated and the value for that row has been populated, with null as default values for the previously added row:


Hopefully this post has illustrated the potential of the UpdateDatabaseTable processor to make schema migration/drift and database replication techniques much easier in Apache NiFi. As always, I welcome all comments, questions, and suggestions. Cheers!

Friday, September 18, 2020

Infer NiFi Schema from File in Groovy

 The "Infer Schema" option in CSV, JSON, and XML readers in NiFi (as of 1.9.0) has made it much easier to get your record-based data into NiFi and operate on it without having to provide the schema explicitly. However sometimes the schema may need to be tweaked (to make a field required) or datatypes changed (numbers to strings, e.g.), but writing a schema from scratch can be a pain.

To help with that, I whipped up a quick Groovy script that will check a file's extension for json, csv, or xml, and call the same schema inference code that NiFi does, then writing the Avro schema to standard out. This tool can be used to infer the schema for an arbitrary file but then use the output in an explicit schema, presumably after some minor changes are made.

The Groovy script (I named it infer_schema.groovy) is as follows:

#!/usr/bin/groovy
@Grab('org.apache.nifi:nifi-record:1.12.0')
@Grab('org.apache.nifi:nifi-record-serialization-services:1.12.0')
@Grab('org.apache.nifi:nifi-avro-record-utils:1.12.0')
import org.apache.nifi.schema.inference.*
import org.codehaus.jackson.*
import org.apache.nifi.json.*
import org.apache.nifi.avro.*
import org.apache.nifi.csv.*
import org.apache.nifi.xml.inference.*
import org.apache.nifi.context.*
import org.apache.nifi.components.*
if(args?.length < 1) {
  println 'Usage: infer_schema <file>'
  System.exit(1)
}
def fileToInfer = new File(args[0])
def ext = fileToInfer.name[(fileToInfer.name.lastIndexOf('.') + 1)..-1]
RecordSourceFactory recordSourceFactory
SchemaInferenceEngine inferenceEngine
switch(ext) {
 case 'json': recordSourceFactory = {var, inStream -> new JsonRecordSource(inStream)}
              inferenceEngine = new JsonSchemaInference(new TimeValueInference(null, null, null))
              break
 case 'csv': def context = new PropertyContext() {
                 PropertyValue getProperty(PropertyDescriptor descriptor) {
                     return (['getValue': { -> 'UTF-8'}] as PropertyValue)
                 }
                Map<String,String> getAllProperties() { [:] }
            }
             recordSourceFactory = {var, inStream -> new CSVRecordSource(inStream, context, var)}
             inferenceEngine = new CSVSchemaInference(new TimeValueInference(null, null, null))
              break
 case 'xml': recordSourceFactory = {var, inStream -> new XmlRecordSource(inStream, false)}
              inferenceEngine = new XmlSchemaInference(new TimeValueInference(null, null, null))
              break
}
fileToInfer.withInputStream {inStream ->
    def recordSource = recordSourceFactory.create([:], inStream)
    println AvroTypeUtil.buildAvroSchema(inferenceEngine.inferSchema(recordSource)).toString(true)
}


If you have date/time/timestamp values, you'll likely want to provide formats to the various TimeValueInference constructors that match the expected format of your time-based values. The script can certainly use some work to make it more robust, but mostly this is an example of using NiFi libraries in Groovy to accomplish tasks outside of a NiFi instance.

As always I welcome all comments, questions, and suggestions. Cheers!