Tuesday, November 7, 2017

InvokeScriptedProcessor template revisited (with Jython)

In a previous post, I provided a template in Groovy that would allow NiFi users to port their ExecuteScript Groovy scripts into the faster InvokeScriptedProcessor (ISP) processor. ISP is faster than ExecuteScript because the script is only reloaded when the code or other config changes, versus ExecuteScript which evaluates the script each time the processor is invoked.

Since that post, I've gotten a couple of requests (such as this one) for an ISP template written in Jython, so users that have ExecuteScript processors using Jython scripts can benefit from the ISP performance gains. Ask and ye shall receive :) The following Jython script is meant to be pasted into an InvokeScriptedProcessor's Script Body property, and there is a comment indicating where to add imports and the ExecuteScript code:

#////////////////////////////////////////////////////////////
#// imports go here
#////////////////////////////////////////////////////////////
from org.apache.nifi.processor import Processor,Relationship
from java.lang import Throwable

class E():
    def __init__(self):
        pass
    def executeScript(self,session, context, log, REL_SUCCESS, REL_FAILURE):
        #////////////////////////////////////////////////////////////
        #// Replace 'pass' with your code
        #////////////////////////////////////////////////////////////
        pass
#end class

class JythonProcessor(Processor):   
    REL_SUCCESS = Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    REL_FAILURE = Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    log = None
    e = E()
    def initialize(self,context):
        self.log = context.logger
    def getRelationships(self):
        return set([self.REL_SUCCESS, self.REL_FAILURE])
    def validate(self,context):
        pass
    def onPropertyModified(self,descriptor, oldValue, newValue):
        pass
    def getPropertyDescriptors(self):
        return []
    def getIdentifier(self):
        return None    
    def onTrigger(self,context, sessionFactory):
        session = sessionFactory.createSession()
        try:
            self.e.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE)
            session.commit()
        except Throwable, t:
            self.log.error('{} failed to process due to {}; rolling back session', [self, t])
            session.rollback(true)
            raise t
#end class

processor = JythonProcessor()

Like the Groovy version, you just need to add your imports to the top of the file, and paste your ExecuteScript Jython code into the executeScript() method, replacing the "pass" line. As always, please let me know how/if this works for you, and if you have any comments, questions, or suggestions.  The script is available as a Gist also.  Cheers!