Friday, September 18, 2020

Infer NiFi Schema from File in Groovy

 The "Infer Schema" option in CSV, JSON, and XML readers in NiFi (as of 1.9.0) has made it much easier to get your record-based data into NiFi and operate on it without having to provide the schema explicitly. However sometimes the schema may need to be tweaked (to make a field required) or datatypes changed (numbers to strings, e.g.), but writing a schema from scratch can be a pain.

To help with that, I whipped up a quick Groovy script that will check a file's extension for json, csv, or xml, and call the same schema inference code that NiFi does, then writing the Avro schema to standard out. This tool can be used to infer the schema for an arbitrary file but then use the output in an explicit schema, presumably after some minor changes are made.

The Groovy script (I named it infer_schema.groovy) is as follows:

#!/usr/bin/groovy
@Grab('org.apache.nifi:nifi-record:1.12.0')
@Grab('org.apache.nifi:nifi-record-serialization-services:1.12.0')
@Grab('org.apache.nifi:nifi-avro-record-utils:1.12.0')
import org.apache.nifi.schema.inference.*
import org.codehaus.jackson.*
import org.apache.nifi.json.*
import org.apache.nifi.avro.*
import org.apache.nifi.csv.*
import org.apache.nifi.xml.inference.*
import org.apache.nifi.context.*
import org.apache.nifi.components.*
if(args?.length < 1) {
  println 'Usage: infer_schema <file>'
  System.exit(1)
}
def fileToInfer = new File(args[0])
def ext = fileToInfer.name[(fileToInfer.name.lastIndexOf('.') + 1)..-1]
RecordSourceFactory recordSourceFactory
SchemaInferenceEngine inferenceEngine
switch(ext) {
 case 'json': recordSourceFactory = {var, inStream -> new JsonRecordSource(inStream)}
              inferenceEngine = new JsonSchemaInference(new TimeValueInference(null, null, null))
              break
 case 'csv': def context = new PropertyContext() {
                 PropertyValue getProperty(PropertyDescriptor descriptor) {
                     return (['getValue': { -> 'UTF-8'}] as PropertyValue)
                 }
                Map<String,String> getAllProperties() { [:] }
            }
             recordSourceFactory = {var, inStream -> new CSVRecordSource(inStream, context, var)}
             inferenceEngine = new CSVSchemaInference(new TimeValueInference(null, null, null))
              break
 case 'xml': recordSourceFactory = {var, inStream -> new XmlRecordSource(inStream, false)}
              inferenceEngine = new XmlSchemaInference(new TimeValueInference(null, null, null))
              break
}
fileToInfer.withInputStream {inStream ->
    def recordSource = recordSourceFactory.create([:], inStream)
    println AvroTypeUtil.buildAvroSchema(inferenceEngine.inferSchema(recordSource)).toString(true)
}


If you have date/time/timestamp values, you'll likely want to provide formats to the various TimeValueInference constructors that match the expected format of your time-based values. The script can certainly use some work to make it more robust, but mostly this is an example of using NiFi libraries in Groovy to accomplish tasks outside of a NiFi instance.

As always I welcome all comments, questions, and suggestions. Cheers!

1 comment:

  1. I am just starting to work with groovy to call as scripts from ExecuteScript and ExecuteStreamCommand processors. Most of the groovy script examples I have found thus far omit this line: #!/usr/bin/groovy
    Is that line not needed in scripts executed within the context of those processors because nifi bakes the groovy interpreter into the nars for those processors? As opposed to a groovy script run from the Linux command line, which must be explicitly told where the groovy interpreter can be found?
    What I’m really trying to get at is this: do I need to yum install groovy on my system if I’m going to execute my scripts from within these nifi processors, or are the processors groovy-capable “out-of-the-box”?

    ReplyDelete