Friday, September 18, 2020

Infer NiFi Schema from File in Groovy

 The "Infer Schema" option in CSV, JSON, and XML readers in NiFi (as of 1.9.0) has made it much easier to get your record-based data into NiFi and operate on it without having to provide the schema explicitly. However sometimes the schema may need to be tweaked (to make a field required) or datatypes changed (numbers to strings, e.g.), but writing a schema from scratch can be a pain.

To help with that, I whipped up a quick Groovy script that will check a file's extension for json, csv, or xml, and call the same schema inference code that NiFi does, then writing the Avro schema to standard out. This tool can be used to infer the schema for an arbitrary file but then use the output in an explicit schema, presumably after some minor changes are made.

The Groovy script (I named it infer_schema.groovy) is as follows:

#!/usr/bin/groovy
@Grab('org.apache.nifi:nifi-record:1.12.0')
@Grab('org.apache.nifi:nifi-record-serialization-services:1.12.0')
@Grab('org.apache.nifi:nifi-avro-record-utils:1.12.0')
import org.apache.nifi.schema.inference.*
import org.codehaus.jackson.*
import org.apache.nifi.json.*
import org.apache.nifi.avro.*
import org.apache.nifi.csv.*
import org.apache.nifi.xml.inference.*
import org.apache.nifi.context.*
import org.apache.nifi.components.*
if(args?.length < 1) {
  println 'Usage: infer_schema <file>'
  System.exit(1)
}
def fileToInfer = new File(args[0])
def ext = fileToInfer.name[(fileToInfer.name.lastIndexOf('.') + 1)..-1]
RecordSourceFactory recordSourceFactory
SchemaInferenceEngine inferenceEngine
switch(ext) {
 case 'json': recordSourceFactory = {var, inStream -> new JsonRecordSource(inStream)}
              inferenceEngine = new JsonSchemaInference(new TimeValueInference(null, null, null))
              break
 case 'csv': def context = new PropertyContext() {
                 PropertyValue getProperty(PropertyDescriptor descriptor) {
                     return (['getValue': { -> 'UTF-8'}] as PropertyValue)
                 }
                Map<String,String> getAllProperties() { [:] }
            }
             recordSourceFactory = {var, inStream -> new CSVRecordSource(inStream, context, var)}
             inferenceEngine = new CSVSchemaInference(new TimeValueInference(null, null, null))
              break
 case 'xml': recordSourceFactory = {var, inStream -> new XmlRecordSource(inStream, false)}
              inferenceEngine = new XmlSchemaInference(new TimeValueInference(null, null, null))
              break
}
fileToInfer.withInputStream {inStream ->
    def recordSource = recordSourceFactory.create([:], inStream)
    println AvroTypeUtil.buildAvroSchema(inferenceEngine.inferSchema(recordSource)).toString(true)
}


If you have date/time/timestamp values, you'll likely want to provide formats to the various TimeValueInference constructors that match the expected format of your time-based values. The script can certainly use some work to make it more robust, but mostly this is an example of using NiFi libraries in Groovy to accomplish tasks outside of a NiFi instance.

As always I welcome all comments, questions, and suggestions. Cheers!