Monday, June 5, 2017

InvokeScriptedProcessor template (a faster ExecuteScript)

For quick, easy, and small scripting tasks in Apache NiFi, ExecuteScript is often a better choice than InvokeScriptedProcessor, as there is little-to-no boilerplate code, relationships and properties are already defined and supported, and some objects relevant to the NiFi API (such as the ProcessSession, ProcessContext, and ComponentLog) are already bound to the script engine as variables that can readily be used by the script.

However, one tradeoff is performance; in ExecuteScript, the script is evaluated each time onTrigger is executed. With InvokeScriptedProcessor, as long as the script (or any of the InvokeScriptedProcessor properties) is not changed, the scripted Processor instance is maintained by the processor, and its methods are simply invoked when parent methods such as onTrigger() are called by the NiFi framework.

To get the best of both worlds, I have put together an InvokeScriptedProcessor instance that is configured the same way ExecuteScript is. The "success" and "failure" relationships are provided, the API objects are available, and if you simply paste your ExecuteScript code into the same spot in the below script, it will behave like a more performant ExecuteScript instance.  The code is as follows:

////////////////////////////////////////////////////////////
// imports go here
////////////////////////////////////////////////////////////

class E{ void executeScript(session, context, log, REL_SUCCESS, REL_FAILURE) 
    {
        ////////////////////////////////////////////////////////////
        // your code goes here
        ////////////////////////////////////////////////////////////
    }
}

class GroovyProcessor implements Processor {
    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    def ComponentLog log
    def e = new E()   
    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { null }
    String getIdentifier() { null }    
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        def session = sessionFactory.createSession()
        try {
            e.executeScript(session, context, log, REL_SUCCESS, REL_FAILURE)
            session.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session.rollback(true)
            throw t
}}}
processor = new GroovyProcessor()


The boilerplate Processor implementation is at the bottom, and I've left comment blocks where your imports and code go. With some simple cut-and-paste, you should be able to have a pre-evaluated Processor instance that will run your ExecuteScript code faster than before!

If you give this a try, please let me know how/if it works for you. I am always open to suggestions, improvements, comments, and questions.  Cheers!