Tuesday, December 6, 2022

Using UpdateDatabaseRecord for Schema Migration and Database Replication

 In NiFi version 1.19.0, the UpdateDatabaseTable processor was added to help with database schema migration and to make replication of database tables easier. This processor checks the schema of the incoming records against the target database. As in the following example, if the target table does not exist, the processor can either fail the FlowFile or create the table for you:

Having the processor create the table (if it doesn't already exist) allows you to take data from any source/flow and land it in a database even if that table is not already there. This removes the previous need to set up the target tables with the intended schema manually and beforehand. I will illustrate this with a simple flow:

The flow uses GenerateFlowFile to issue some simple sample data, but in practice it will be common to use QueryDatabaseTableRecord to incrementally fetch rows from a source database table. The flow then sends the FlowFiles to UpdateDatabaseTable, but any number of transformations of the data are possible in NiFi before the records are ready for the desired target table. This is the configuration of UpdateDatabaseTable for this example:

It should be noted here (and will be noted later) that the outgoing FlowFiles from the UpdateDatabaseTable processor will have a 'output.table' attribute set to the target table name. This can (and will in this example) be used later to refer to the table that was created/updated.

When the flow is started, the target table 'udt_test' does not yet exist in the database:

Taking a look at an example FlowFile, you can see the schema of the data (i.e. the JSON structure) and thus the intended target table definition:

After starting this flow and seeing at least one FlowFile progress through UpdateDatabaseTable successfully, we can now verify the desired target table does indeed exist in the target database:

And after running with the following configuration of PutDatabaseRecord:

We can see that not only has the table been created (using the aforementioned 'output.table' attribute), but the record has been inserted successfully:

The example above is meant to demonstrate that you can take data (in the form of NiFi records in a FlowFile) from any source, transform it however you like, and put it into a database table without the table having to exist beforehand. But it can do more than that! If you have an existing table and an extra field in your input data that does not have an associated column in the target table, UpdateDatabaseTable will add the missing column and start populating rows using all the available columns in the input data. Using the above example, let's say we added an extra field named 'newField' with a string value:

Then we run that FlowFile through UpdateDatabaseTable, and see that the table schema has been updated and the value for that row has been populated, with null as default values for the previously added row:

Hopefully this post has illustrated the potential of the UpdateDatabaseTable processor to make schema migration/drift and database replication techniques much easier in Apache NiFi. As always, I welcome all comments, questions, and suggestions. Cheers!

Friday, September 18, 2020

Infer NiFi Schema from File in Groovy

 The "Infer Schema" option in CSV, JSON, and XML readers in NiFi (as of 1.9.0) has made it much easier to get your record-based data into NiFi and operate on it without having to provide the schema explicitly. However sometimes the schema may need to be tweaked (to make a field required) or datatypes changed (numbers to strings, e.g.), but writing a schema from scratch can be a pain.

To help with that, I whipped up a quick Groovy script that will check a file's extension for json, csv, or xml, and call the same schema inference code that NiFi does, then writing the Avro schema to standard out. This tool can be used to infer the schema for an arbitrary file but then use the output in an explicit schema, presumably after some minor changes are made.

The Groovy script (I named it infer_schema.groovy) is as follows:

import org.apache.nifi.schema.inference.*
import org.codehaus.jackson.*
import org.apache.nifi.json.*
import org.apache.nifi.avro.*
import org.apache.nifi.csv.*
import org.apache.nifi.xml.inference.*
import org.apache.nifi.context.*
import org.apache.nifi.components.*
if(args?.length < 1) {
  println 'Usage: infer_schema <file>'
def fileToInfer = new File(args[0])
def ext = fileToInfer.name[(fileToInfer.name.lastIndexOf('.') + 1)..-1]
RecordSourceFactory recordSourceFactory
SchemaInferenceEngine inferenceEngine
switch(ext) {
 case 'json': recordSourceFactory = {var, inStream -> new JsonRecordSource(inStream)}
              inferenceEngine = new JsonSchemaInference(new TimeValueInference(null, null, null))
 case 'csv': def context = new PropertyContext() {
                 PropertyValue getProperty(PropertyDescriptor descriptor) {
                     return (['getValue': { -> 'UTF-8'}] as PropertyValue)
                Map<String,String> getAllProperties() { [:] }
             recordSourceFactory = {var, inStream -> new CSVRecordSource(inStream, context, var)}
             inferenceEngine = new CSVSchemaInference(new TimeValueInference(null, null, null))
 case 'xml': recordSourceFactory = {var, inStream -> new XmlRecordSource(inStream, false)}
              inferenceEngine = new XmlSchemaInference(new TimeValueInference(null, null, null))
fileToInfer.withInputStream {inStream ->
    def recordSource = recordSourceFactory.create([:], inStream)
    println AvroTypeUtil.buildAvroSchema(inferenceEngine.inferSchema(recordSource)).toString(true)

If you have date/time/timestamp values, you'll likely want to provide formats to the various TimeValueInference constructors that match the expected format of your time-based values. The script can certainly use some work to make it more robust, but mostly this is an example of using NiFi libraries in Groovy to accomplish tasks outside of a NiFi instance.

As always I welcome all comments, questions, and suggestions. Cheers!

Tuesday, April 2, 2019

Record Processing with InvokeScriptedProcessor (using Groovy)

While looking into NIFI-6151, I commented that record processing can be done by scripting processor(s), but the most appropriate approach is probably to use InvokeScriptedProcessor, as you can add more complex properties (specifying Controller Services, e.g.), versus user-defined properties for ExecuteScript.

Having said that, there is a decent amount of setup code you'd need for any record-based scripted processor, and there is Best Practice around how to handle the records, namely that you process the first record before creating a RecordSetWriter, in case your custom processor code needs to update the schema that the RecordSetWriter will use. The Groovy example below is adapted from AbstractRecordProcessor, the common base class for all record processors in the standard NAR. Note the two comment sections for handling the first and rest of the incoming records, these are where you'd put your custom code to handle records. It's probably best to add a private method to your scripted processor and just call that once for the first record and then once again in the loop (that's what AbstractRecordProcessor does):

import org.apache.nifi.flowfile.attributes.CoreAttributes
import org.apache.nifi.processor.AbstractProcessor
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
import org.apache.nifi.schema.access.SchemaNotFoundException
import java.util.concurrent.atomic.AtomicInteger

class MyRecordProcessor extends AbstractProcessor {

    // Properties
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
        .displayName("Record Reader")
        .description("Specifies the Controller Service to use for reading incoming data")
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
        .displayName("Record Writer")
        .description("Specifies the Controller Service to use for writing out the records")

    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build()

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        def properties = [] as ArrayList

    Set<Relationship> getRelationships() {
       [REL_SUCCESS, REL_FAILURE] as Set<Relationship>

    void onTrigger(ProcessContext context, ProcessSession session) {
        def flowFile = session.get()
        if (!flowFile) return

        def readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory)
        def writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory)
        final Map<String, String> attributes = new HashMap<>()
        final AtomicInteger recordCount = new AtomicInteger()
        final FlowFile original = flowFile
        final Map<String, String> originalAttributes = flowFile.attributes
        try {
            flowFile = session.write(flowFile,  { inStream, outStream ->
                    def reader = readerFactory.createRecordReader(originalAttributes, inStream, getLogger())
                     try {

                        // Get the first record and process it before we create the Record Writer. 
                        // We do this so that if the Processor updates the Record's schema, we can provide 
                        // an updated schema to the Record Writer. If there are no records, then we can
                        // simply create the Writer with the Reader's schema and begin & end the RecordSet
                        def firstRecord = reader.nextRecord()
                        if (!firstRecord) {
                            def writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
                            def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream)
                            try {
                                def writeResult = writer.finishRecordSet()
                                attributes['record.count'] = String.valueOf(writeResult.recordCount)
                                attributes[CoreAttributes.MIME_TYPE.key()] = writer.mimeType
                            } finally {

                        // TODO process first record

                        def writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.schema)
                        def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream)
                        try {
                            def record
                            while (record = reader.nextRecord()) {
                                // TODO process next record

                            def writeResult = writer.finishRecordSet()
                            attributes.put('record.count', String.valueOf(writeResult.recordCount))
                            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.mimeType)
                        } finally {
                    } catch (final SchemaNotFoundException e) {
                        throw new ProcessException(e.localizedMessage, e)
                    } catch (final MalformedRecordException e) {
                        throw new ProcessException('Could not parse incoming data', e)
                    } finally {
                } as StreamCallback)
        } catch (final Exception e) {
            getLogger().error('Failed to process {}; will route to failure', [flowFile, e] as Object[])
            session.transfer(flowFile, REL_FAILURE);
        flowFile = session.putAllAttributes(flowFile, attributes)
        recordCount.get() ?  session.transfer(flowFile, REL_SUCCESS) : session.remove(flowFile)
        def count = recordCount.get()
        session.adjustCounter('Records Processed', count, false)
        getLogger().info('Successfully converted {} records for {}', [count, flowFile] as Object[])

processor = new MyRecordProcessor()

Inside the session.write() StreamCallback, we first check to see if there are any records at all, and if there aren't we just populate the standard record-based attributes (such as record.count and the MIME type of the writer) and write out a zero-record flow file.

After that, it's time to process the first record separately from the rest. This is because the reader and/or the custom processor code may change the writer's schema from what the reader's schema. This happens, for example, during schema inference, a capability of RecordReaders since NiFi 1.9.0.

Then the first record is written, and the process continues for the rest of the records. At the end, the standard record-based attributes are populated, then the flow file is updated and transferred. The script above also includes error-handling for when things go wrong.

As always, if you try this out please let me know how/if it works for you, and I welcome all comments, questions, and suggestions. Cheers!

Wednesday, August 22, 2018

Database Sequence Lookup with ScriptedLookupService

While addressing this question, I realized it would make a good example of how to use ScriptedLookupService to provide a sequence number from a database to a field in a record. This can be useful when using PutDatabaseRecord downstream, when you want some field to contain a sequence number from a database.

In my example I'm using a CSV file with a few dummy values in it, adding an "id" field containing a sequence number, and writing out the records as JSON objects:

The LookupRecord configuration is as follows, note that the processor requires the value of any user-defined property to be a RecordPath that evaluates to a non-null value. I chose "/Age" as a dummy RecordPath here, since I know the record contains a non-null Age field:

The Groovy script for ScriptedLookupService is as follows, I am connecting to a PostgreSQL database so I generated the corresponding SQL using nextval(), you can change this to the SQL appropriate for your database type:

import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.dbcp.DBCPService
import java.sql.*

class SequenceLookupService implements LookupService<String> {

final String ID = UUID.randomUUID().toString()

public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
            .name("Database Connection Pooling Service")
            .description("The Controller Service that is used to obtain connection to database")

Connection conn = null
Statement stmt = null
ComponentLog log = null

    Optional<String> lookup(Map<String, String> coordinates) {
        final String key = coordinates.keySet().first()
        ResultSet rs = stmt?.executeQuery("select nextval('${key}')".toString())
        return rs.next() ? Optional.ofNullable(rs.getLong(1)) : null
    Set<String> getRequiredKeys() {
        return java.util.Collections.emptySet();
    Class<?> getValueType() {
        return String

    void initialize(ControllerServiceInitializationContext context) throws InitializationException {

    Collection<ValidationResult> validate(ValidationContext context) {

    PropertyDescriptor getPropertyDescriptor(String name) {
       name.equals(DBCP_SERVICE.name) ? DBCP_SERVICE : null

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {

    List<PropertyDescriptor> getPropertyDescriptors() {
        [DBCP_SERVICE] as List;

    String getIdentifier() {

    def onEnabled(context) {
        def db = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService)
        conn = db.connection
        stmt = conn.createStatement()

    def onDisabled() {

    def setLogger(logger) {
        log = logger

lookupService = new SequenceLookupService()

One caveat I should mention is that currently, any properties defined by the script do not get added to the ScriptedLookupService dialog until the service is enabled, I have written an improvement Jira (NIFI-5547) to fix that.

The HCC question also refers to the nextInt() feature of NiFi Expression Language, this is MUCH faster than retrieving a sequence from a database. So is the UpdateAttribute approach, to let NiFi handle the "sequence" rather than an external database. But if you do need to use an external database sequence, this script should allow you to do that.

Note in my lookup function I only grabbed the first entry in the map, this can easily be extended to do multiple lookups of multiple sequences, but note that user-defined properties are unique by name, so you can't fill in multiple fields from the same sequence.

The above template (with script) is available as a Gist here. As always, please let me know if you try this whether it works for you, and I welcome all comments, questions, and suggestions. Cheers!

Monday, August 13, 2018

One-and-done Flows (aka Job Processing)

We get a lot of questions on the Apache NiFi mailing list about being able to run a flow/processor just once, and indeed there is a related feature request Jira (NIFI-1407) that remains unimplemented. IMO this is for a few reasons, one being that "one-shot" processing doesn't really align with a flow-based architecture such as NiFi's, it is really more of a job processing feature.

Having said that, many users still desire/need that capability, and some workarounds have been presented, such as scheduling the source processor (often GenerateFlowFile if used as a trigger) for an exorbitant amount of time (weeks, e.g.) knowing that probably someone will come in and restart the processor before that. The other is to start the processor from the UI and immediately stop it, that keeps the original run going but prevents the processor from being scheduled further. The same can be done from the REST API, although for secure NiFi instances/clusters that can be a bit painful to set up.

In that vein I wanted to tackle one common use case in this area, the one where a flow file needs to be generated as a trigger, with or without possible content. I'll present two different Groovy scripts to be used in an ExecuteScript processor, one to use downstream from GenerateFlowFile, and one as the source processor itself.

For the first scenario, I'm picturing a GenerateFlowFile as the source processor, although technically it could be any source processor. The key is to schedule it to run at some huge interval, like 1 year or something. This is normally a decent workaround, but technically doesn't work if you leave your flow running for a year. However having this long an interval will prevent a ton of provenance events for the flow files headed into the ExecuteScript.

This Groovy script uses the processor's StateManager to keep track of a boolean variable "one_shot". If it is not yet set, the flow file is transferred to success and then the variable "one_shot" is set. From then on, the flow files will just be removed from the session. Note that this creates a DROP provenance event, which is why we don't want a lot of CREATE/DROP events for this part of the flow, which reinforces my earlier point about scheduling the source processor.  The script is as follows:

import org.apache.nifi.components.state.*

flowFile = session.get()
if(!flowFile) return
StateManager sm = context.stateManager
def oldState = new HashMap<String,String>()
if(!oldState.one_shot) {
  // Haven't sent the flow file yet, do it now and update state
  session.transfer(flowFile, REL_SUCCESS)
  oldState.one_shot = 'true'
  sm.setState(oldState, Scope.LOCAL)
} else {
  // Remove flow file -- NOTE this causes upstream data loss, only use
  // when the upstream flow files are meant as triggers. Also be sure the
  // upstream schedule is large like 1 day, to avoid unnecessary provenance events.
An alternative to dropping the flow files is to rollback the session, that can prevent data loss but eventually will lead to backpressure being applied at the source processor. If that makes more sense for your use case, then just replace the session.remove() with session.rollback().

The other scenario is when you don't need a full-fledged GenerateFlowFile processor (or any other processor), then ExecuteScript can be the source processor. The following script works similarly to the previous script, except it generates rather than receives flow files, and will accept an optional user-defined property called "content", which it will use for the outgoing FlowFile content. Again the run schedule should be sufficiently large, but in this case it won't matter as much since no provenance events will be created. However a thread is created and destroyed at each scheduled run, so might as well set the run schedule to be a large value. The script is as follows:

import org.apache.nifi.components.state.*

StateManager sm = context.stateManager
def oldState = new HashMap<String,String>()
if(!oldState.one_shot) {
  // Haven't sent the flow file yet, do it now and update state
  def flowFile = session.create()
  try {
    String ffContent = context.getProperty('content')?.evaluateAttributeExpressions()?.value
    if(ffContent) {
      flowFile = session.write(flowFile, {outputStream -> 
      } as OutputStreamCallback)
    session.transfer(flowFile, REL_SUCCESS)
  } catch(e) {
    log.error("Couldn't generate FlowFile", e)
  oldState.one_shot = 'true'
  sm.setState(oldState, Scope.LOCAL)

To reset these processors (in order to run another one-shot job/flow), you can stop them, right-click and choose View State then Clear (or send the corresponding command(s) via the REST API).

As always, please let me know how/if this works for you. Cheers!

Thursday, February 1, 2018

InvokeScriptedProcessor template revisited (with Javascript)

I recently got a request on Google+ for a Javascript/Nashorn example of InvokeScriptedProcessor, and since I've done articles on an InvokeScriptedProcessor template (to port from ExecuteScript) for Groovy and Jython, I thought I'd do the same for Javascript, since the template also illustrates the methods you'd have to override in order to write your own InvokeScriptedProcessor that does more powerful/flexible things.

Javascript/Nashorn is a slightly peculiar animal in terms of its ScriptEngine. In Groovy and Jython you need to create a subclass of Processor, implement the requisite methods, and assign an instance of the subclass to the variable "processor". The same is true of Javascript, except in order to invoke methods on the "processor" object, it has to be an instance of ScriptObject (an internal Nashorn class) or ScriptObjectMirror (a class of the Nashorn scripting API). In our context, the script body/file of InvokeScriptedProcessor is the thing that gets evaluated and cast as a ScriptObjectMirror, which means we need a slightly different approach than just creating a subclass and setting "processor" to an instance of it.  Instead the script itself has to be able to be cast as a Processor, so it can be a ScriptObjectMirror and Processor at the same time.

To that end, we declare the Processor interface methods (and its inherited interface methods) as functions on the main script, and then we set the "processor" variable to "this". Another difference from the other scripting language examples is that you need variable access to the various Java classes (both in Java proper and the NiFi API) before you can instantiate them. So in the "imports" section you will see a number of Java.type() calls, to get JS references to the Java classes.  You will have to do the same if you reference other Java classes in your executeScript() body.

Speaking of which, I tried to keep the same approach to giving an obvious place to paste your ExecuteScript code into the InvokeScriptedProcessor template, to make porting from ExecuteScript to InvokeScriptedProcessor easier. The template follows:

// "imports" go here

var Processor = Java.type("org.apache.nifi.processor.Processor");
var Relationship =  Java.type("org.apache.nifi.processor.Relationship");
var HashSet =  Java.type("java.util.HashSet");
var log = null;
var REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build();
var REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build();

function executeScript(session, context, log, REL_SUCCESS, REL_FAILURE) {
   // your code goes here

function initialize(context) { log = context.logger; }
function getRelationships() {
    var r = new HashSet(); 
    return r; 
function validate(context) { return null; }
function getPropertyDescriptor(name) { return null; }
function onPropertyModified(descriptor, oldValue, newValue) { return null; }
function getPropertyDescriptors() { return null; }
function getIdentifier() { return null; }
function onTrigger(context, sessionFactory) {
        var session = sessionFactory.createSession();
        try {
            executeScript(session, context, log, REL_SUCCESS, REL_FAILURE);
        } catch (t) {
            log.error("{} failed to process due to {}; rolling back session", Java.to([this, t], "java.lang.Object[]"));
            throw t;

processor = this;

As always, please let me know how/if this works for you, and of course comments, questions, and suggestions are welcome.  Cheers!

Tuesday, November 7, 2017

InvokeScriptedProcessor template revisited (with Jython)

In a previous post, I provided a template in Groovy that would allow NiFi users to port their ExecuteScript Groovy scripts into the faster InvokeScriptedProcessor (ISP) processor. ISP is faster than ExecuteScript because the script is only reloaded when the code or other config changes, versus ExecuteScript which evaluates the script each time the processor is invoked.

Since that post, I've gotten a couple of requests (such as this one) for an ISP template written in Jython, so users that have ExecuteScript processors using Jython scripts can benefit from the ISP performance gains. Ask and ye shall receive :) The following Jython script is meant to be pasted into an InvokeScriptedProcessor's Script Body property, and there is a comment indicating where to add imports and the ExecuteScript code:

#// imports go here
from org.apache.nifi.processor import Processor,Relationship
from java.lang import Throwable

class E():
    def __init__(self):
    def executeScript(self,session, context, log, REL_SUCCESS, REL_FAILURE):
        #// Replace 'pass' with your code
#end class

class JythonProcessor(Processor):   
    REL_SUCCESS = Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    REL_FAILURE = Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    log = None
    e = E()
    def initialize(self,context):
        self.log = context.logger
    def getRelationships(self):
        return set([self.REL_SUCCESS, self.REL_FAILURE])
    def validate(self,context):
    def onPropertyModified(self,descriptor, oldValue, newValue):
    def getPropertyDescriptors(self):
        return []
    def getIdentifier(self):
        return None    
    def onTrigger(self,context, sessionFactory):
        session = sessionFactory.createSession()
            self.e.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE)
        except Throwable, t:
            self.log.error('{} failed to process due to {}; rolling back session', [self, t])
            raise t
#end class

processor = JythonProcessor()

Like the Groovy version, you just need to add your imports to the top of the file, and paste your ExecuteScript Jython code into the executeScript() method, replacing the "pass" line. As always, please let me know how/if this works for you, and if you have any comments, questions, or suggestions.  The script is available as a Gist also.  Cheers!