Monday, June 5, 2017

InvokeScriptedProcessor template (a faster ExecuteScript)

For quick, easy, and small scripting tasks in Apache NiFi, ExecuteScript is often a better choice than InvokeScriptedProcessor, as there is little-to-no boilerplate code, relationships and properties are already defined and supported, and some objects relevant to the NiFi API (such as the ProcessSession, ProcessContext, and ComponentLog) are already bound to the script engine as variables that can readily be used by the script.

However, one tradeoff is performance; in ExecuteScript, the script is evaluated each time onTrigger is executed. With InvokeScriptedProcessor, as long as the script (or any of the InvokeScriptedProcessor properties) is not changed, the scripted Processor instance is maintained by the processor, and its methods are simply invoked when parent methods such as onTrigger() are called by the NiFi framework.

To get the best of both worlds, I have put together an InvokeScriptedProcessor instance that is configured the same way ExecuteScript is. The "success" and "failure" relationships are provided, the API objects are available, and if you simply paste your ExecuteScript code into the same spot in the below script, it will behave like a more performant ExecuteScript instance.  The code is as follows:

////////////////////////////////////////////////////////////
// imports go here
////////////////////////////////////////////////////////////

class E{ void executeScript(session, context, log, REL_SUCCESS, REL_FAILURE) 
    {
        ////////////////////////////////////////////////////////////
        // your code goes here
        ////////////////////////////////////////////////////////////
    }
}

class GroovyProcessor implements Processor {
    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    def ComponentLog log
    def e = new E()   
    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { null }
    String getIdentifier() { null }    
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        def session = sessionFactory.createSession()
        try {
            e.executeScript(session, context, log, REL_SUCCESS, REL_FAILURE)
            session.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session.rollback(true)
            throw t
}}}
processor = new GroovyProcessor()


The boilerplate Processor implementation is at the bottom, and I've left comment blocks where your imports and code go. With some simple cut-and-paste, you should be able to have a pre-evaluated Processor instance that will run your ExecuteScript code faster than before!

If you give this a try, please let me know how/if it works for you. I am always open to suggestions, improvements, comments, and questions.  Cheers!

16 comments:

  1. Matt,

    I tried to write a custom groovy script to filter CSV with a set of conditions and it worked like magic, this gave me more flexibility on using object oriented approach and get myself familiar with what methods/classes to consider if I was to write a custom processor. However the only problem I observed was when I'm trying to get a list of fields from processor as dynamic values and the script is not recognizing them. May be I am not doing it right and you can comment on it please. Thanks for the fantastic blog and please keep writing!!

    Cheers!
    Karthik

    ReplyDelete
    Replies
    1. Are you using ExecuteScript or InvokeScriptedProcessor? If the former, the dynamic properties are bound to the script as variables, whose names are the keys and whose values are PropertyValue objects (see my HCC article: https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html). If you are using InvokeScriptedProcessor, you can get to the dynamic properties via the "context" object in onTrigger(). Here's an example that logs each dynamic property added to the InvokeScriptedProcessor configuration:

      context.properties.findAll {k,v -> k.dynamic}.each {k,v -> log.info("found dynamic property $v")}

      Delete
  2. Hi Matt, I'm looking to do the same with a Python Script. Do you have an example using Python?

    ReplyDelete
  3. Hello, how can I reuse this template in NiFi? I can see InvokeScriptedProcessor available but it has no relationships. Where should I reuse your processor? Is it a customized one?

    Thanks,
    Tiago

    ReplyDelete
    Replies
    1. It has no relationships until you paste in the above code and hit the "Apply" button. When you open the processor dialog again, the relationships you define in the script (in my template, "success" and "failure") will be present.

      Delete
  4. Hey Matt,
    I have a requirement in my project like (API)Fetch data and load into RDBMS databases.How I can created a flow in Nifi ?

    Thanks,
    Manikandan K

    ReplyDelete
  5. nice post, 2019 now and seems it is no longer compatible

    ReplyDelete
  6. Hi

    Many thanks for the template. I tried to copy in the function mburgess wrote from https://community.cloudera.com/t5/Support-Questions/NiFi-convert-everything-in-json-to-attributes-not-one-by-one/td-p/192812 to convert all json fields into properties.
    But the InvokeScriptProcessor start failing with the import org.apache.commons.io.IOUtils

    Can you help here, please :-)

    ReplyDelete
  7. Hi, I am trying to use InvokeScriptedProcessor in Nifi 1.14.0 to implement custom logic in Jython. It fails with error sys.path.append('/opt/nifi/nifi-current/ctct-nifi-artifacts/site-packages')import json
    ^
    SyntaxError: no viable alternative at input 'import'

    ↳ causes: javax.script.ScriptException: SyntaxError: no viable alternative at input 'import' at line number 3 at column number 75
    ↳ causes: org.apache.nifi.processor.exception.ProcessException: Could not instantiate script engines

    My module directory field has path to Python2.7 site-packages and it still does not solve the problem.

    Can you please help me out?

    ReplyDelete
    Replies
    1. Hi, I was facing the same problem. I added a new line before the import statement, for me it was the first line so the now my code starts from line 2, that seemed to work.

      Delete