Monday, June 5, 2017

InvokeScriptedProcessor template (a faster ExecuteScript)

For quick, easy, and small scripting tasks in Apache NiFi, ExecuteScript is often a better choice than InvokeScriptedProcessor, as there is little-to-no boilerplate code, relationships and properties are already defined and supported, and some objects relevant to the NiFi API (such as the ProcessSession, ProcessContext, and ComponentLog) are already bound to the script engine as variables that can readily be used by the script.

However, one tradeoff is performance; in ExecuteScript, the script is evaluated each time onTrigger is executed. With InvokeScriptedProcessor, as long as the script (or any of the InvokeScriptedProcessor properties) is not changed, the scripted Processor instance is maintained by the processor, and its methods are simply invoked when parent methods such as onTrigger() are called by the NiFi framework.

To get the best of both worlds, I have put together an InvokeScriptedProcessor instance that is configured the same way ExecuteScript is. The "success" and "failure" relationships are provided, the API objects are available, and if you simply paste your ExecuteScript code into the same spot in the below script, it will behave like a more performant ExecuteScript instance.  The code is as follows:

////////////////////////////////////////////////////////////
// imports go here
////////////////////////////////////////////////////////////

class E{ void executeScript(session, context, log, REL_SUCCESS, REL_FAILURE) 
    {
        ////////////////////////////////////////////////////////////
        // your code goes here
        ////////////////////////////////////////////////////////////
    }
}

class GroovyProcessor implements Processor {
    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    def ComponentLog log
    def e = new E()   
    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { null }
    String getIdentifier() { null }    
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        def session = sessionFactory.createSession()
        try {
            e.executeScript(session, context, log, REL_SUCCESS, REL_FAILURE)
            session.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session.rollback(true)
            throw t
}}}
processor = new GroovyProcessor()


The boilerplate Processor implementation is at the bottom, and I've left comment blocks where your imports and code go. With some simple cut-and-paste, you should be able to have a pre-evaluated Processor instance that will run your ExecuteScript code faster than before!

If you give this a try, please let me know how/if it works for you. I am always open to suggestions, improvements, comments, and questions.  Cheers!

2 comments:

  1. Matt,

    I tried to write a custom groovy script to filter CSV with a set of conditions and it worked like magic, this gave me more flexibility on using object oriented approach and get myself familiar with what methods/classes to consider if I was to write a custom processor. However the only problem I observed was when I'm trying to get a list of fields from processor as dynamic values and the script is not recognizing them. May be I am not doing it right and you can comment on it please. Thanks for the fantastic blog and please keep writing!!

    Cheers!
    Karthik

    ReplyDelete
    Replies
    1. Are you using ExecuteScript or InvokeScriptedProcessor? If the former, the dynamic properties are bound to the script as variables, whose names are the keys and whose values are PropertyValue objects (see my HCC article: https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html). If you are using InvokeScriptedProcessor, you can get to the dynamic properties via the "context" object in onTrigger(). Here's an example that logs each dynamic property added to the InvokeScriptedProcessor configuration:

      context.properties.findAll {k,v -> k.dynamic}.each {k,v -> log.info("found dynamic property $v")}

      Delete