Thursday, April 27, 2023

Transform JSON String Field Into Record Object in NiFi

 One question that has come up often in the NiFi community is: "I have a field in my record(s) that is a string that actually contains JSON. Can I change that field to have the JSON object as the value?" It turns out the answer is yes! It is possible via ScriptedTransformRecord, and there are a couple of cool techniques I will show in this post where you can work with any format, not just JSON.

To start, I have a flow with a GenerateFlowFile (for test input) and a ScriptedTransformRecord for which its success relationship is sent to a funnel (just for illustration):


For GenerateFlowFile, I set the content to some test input with a field "myjson" which is a string containing JSON:

[
  {
    "id": 1,
    "myjson": "{\"a\":\"hello\",\"b\": 100}"
  },
  {
    "id": 2,
    "myjson": "{\"a\":\"world\",\"b\": 200}"
  }
]


You can see the "myjson" field is a string with a JSON object having two fields "a" and "b". With ScriptedTransformRecord I will need to look for that record field by name and parse it into a JSON object, which is really easy with Groovy. This is the entire script, with further explanation below:

import org.apache.nifi.serialization.record.*
import org.apache.nifi.serialization.record.util.*
import java.nio.charset.*
import groovy.json.*

def recordMap = record.toMap()
def derivedValues = new HashMap(recordMap)
def slurper = new JsonSlurper()
recordMap.each {k,v -> 
    if('myjson'.equals(k)) {
derivedValues.put(k, slurper.parseText(v))
    }
}
def inferredSchema = DataTypeUtils.inferSchema(derivedValues, null, StandardCharsets.UTF_8)
return new MapRecord(inferredSchema, derivedValues)

The first thing the script does (after importing the necessary packages) is to get the existing record fields as a java Map. Then a copy of the map is made so we can change the value of the "myjson" field. The record map is iterated over, giving each key/value pair where the key is the record field name and the value is the record field value. I only want to change the "myjson" field, and I replace the String value with the parsed Java Object. This is possible because the maps are of type <String, Object>.

However, I don't know the new schema of the overall record because "myjson" is no longer a String but a sub-record. I used DataTypeUtils (with a null record name as it's the root record) and its inferSchema() method to get the schema of the new output record, then I return a new record (instead of the original) using that schema and the updated map. Using a JsonRecordSetWriter in ScriptedTransformRecord gives the following output for the above input FlowFile:

[ {
  "id" : 1,
  "myjson" : {
    "a" : "hello",
    "b" : 100
  }
}, {
  "id" : 2,
  "myjson" : {
    "a" : "world",
    "b" : 200
  }
} ]

The field parsing technique (JsonSlurper in this case) can be extended to other things as well, basically any string that can be converted into a Map. Groovy has an XmlSlurper so the content can be XML and not JSON for example.

The schema inference technique can be used in many cases, not just for replacing a single field. It only takes the root-level Map, name of null, and a character set (UTF-8 in this post) and produces a new schema for the outgoing record. You can even produce multiple output records for a single input record but you'll want them both to have the same schema so the entire overall FlowFile has the same schema for all records.

As always, I welcome all comments, questions, and suggestions. Cheers!




No comments:

Post a Comment