Thursday, August 31, 2023

Change Record Field Names with ScriptedTransformRecord

 A question that comes up often in the Apache NiFi community is "How do I change all my record field names?" Some people want to upper- or lower-case all the field names, some want to normalize the names (to remove/replace characters that are invalid for the target system), etc.

There are some solutions described here but they are manual, meaning you have to know all the field names. It would be helpful to be able to programmatically change the field names based on some rule (uppercase, replace, e.g.).  We can use the ScriptedTransformRecord processor for this.

The following is a Groovy script that uppercases all field names, but I added a comment to the section that would do the work of creating a new field name, that section can be replaced with any code that calculates the desired field name:

import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
import org.apache.nifi.serialization.record.util.*
def recordSchema = record.schema
def recordFields = recordSchema.fields
def derivedFields = [] as List<RecordField>
def derivedValues = [:] as Map<String, Object>
recordFields.each {recordField -> 
    def originalFieldName = recordField.fieldName

    // Manipulate the field name(s) here
    def newFieldName = originalFieldName.toUpperCase()

    // Add the new RecordField, to be used in creating a new schema
    derivedFields.add(new RecordField(
newFieldName,
recordField.dataType,
recordField.defaultValue,
recordField.aliases,
recordField.nullable))

    // Add the original value to the new map using the new field name as the key
    derivedValues.put(newFieldName, record.getValue(recordField))
}
def derivedSchema = new SimpleRecordSchema(derivedFields)
return new MapRecord(derivedSchema, derivedValues)

This script iterates over the RecordFields from the original RecordSchema, then populates a new list of RecordFields (after the name has been updated) as well as the Map of field names to the original values. Then a new schema is created and used to create a new Record to return from the script.

As always I welcome all questions, comments, and issues. Cheers!