One question that has come up often in the NiFi community is: "I have a field in my record(s) that is a string that actually contains JSON. Can I change that field to have the JSON object as the value?" It turns out the answer is yes! It is possible via ScriptedTransformRecord, and there are a couple of cool techniques I will show in this post where you can work with any format, not just JSON.
To start, I have a flow with a GenerateFlowFile (for test input) and a ScriptedTransformRecord for which its success relationship is sent to a funnel (just for illustration):
For GenerateFlowFile, I set the content to some test input with a field "myjson" which is a string containing JSON:
{
"id": 1,
"myjson": "{\"a\":\"hello\",\"b\": 100}"
},
{
"id": 2,
"myjson": "{\"a\":\"world\",\"b\": 200}"
}
]
You can see the "myjson" field is a string with a JSON object having two fields "a" and "b". With ScriptedTransformRecord I will need to look for that record field by name and parse it into a JSON object, which is really easy with Groovy. This is the entire script, with further explanation below:
import org.apache.nifi.serialization.record.util.*
import java.nio.charset.*
import groovy.json.*
def derivedValues = new HashMap(recordMap)
def slurper = new JsonSlurper()
recordMap.each {k,v ->
if('myjson'.equals(k)) {
derivedValues.put(k, slurper.parseText(v))
}
}
def inferredSchema = DataTypeUtils.inferSchema(derivedValues, null, StandardCharsets.UTF_8)
return new MapRecord(inferredSchema, derivedValues)
The first thing the script does (after importing the necessary packages) is to get the existing record fields as a java Map. Then a copy of the map is made so we can change the value of the "myjson" field. The record map is iterated over, giving each key/value pair where the key is the record field name and the value is the record field value. I only want to change the "myjson" field, and I replace the String value with the parsed Java Object. This is possible because the maps are of type <String, Object>.
However, I don't know the new schema of the overall record because "myjson" is no longer a String but a sub-record. I used DataTypeUtils (with a null record name as it's the root record) and its inferSchema() method to get the schema of the new output record, then I return a new record (instead of the original) using that schema and the updated map. Using a JsonRecordSetWriter in ScriptedTransformRecord gives the following output for the above input FlowFile:
"id" : 1,
"myjson" : {
"a" : "hello",
"b" : 100
}
}, {
"id" : 2,
"myjson" : {
"a" : "world",
"b" : 200
}
} ]
The field parsing technique (JsonSlurper in this case) can be extended to other things as well, basically any string that can be converted into a Map. Groovy has an XmlSlurper so the content can be XML and not JSON for example.
The schema inference technique can be used in many cases, not just for replacing a single field. It only takes the root-level Map, name of null, and a character set (UTF-8 in this post) and produces a new schema for the outgoing record. You can even produce multiple output records for a single input record but you'll want them both to have the same schema so the entire overall FlowFile has the same schema for all records.
As always, I welcome all comments, questions, and suggestions. Cheers!
No comments:
Post a Comment