Tuesday, November 7, 2017

InvokeScriptedProcessor template revisited (with Jython)

In a previous post, I provided a template in Groovy that would allow NiFi users to port their ExecuteScript Groovy scripts into the faster InvokeScriptedProcessor (ISP) processor. ISP is faster than ExecuteScript because the script is only reloaded when the code or other config changes, versus ExecuteScript which evaluates the script each time the processor is invoked.

Since that post, I've gotten a couple of requests (such as this one) for an ISP template written in Jython, so users that have ExecuteScript processors using Jython scripts can benefit from the ISP performance gains. Ask and ye shall receive :) The following Jython script is meant to be pasted into an InvokeScriptedProcessor's Script Body property, and there is a comment indicating where to add imports and the ExecuteScript code:

#////////////////////////////////////////////////////////////
#// imports go here
#////////////////////////////////////////////////////////////
from org.apache.nifi.processor import Processor,Relationship
from java.lang import Throwable

class E():
    def __init__(self):
        pass
    def executeScript(self,session, context, log, REL_SUCCESS, REL_FAILURE):
        #////////////////////////////////////////////////////////////
        #// Replace 'pass' with your code
        #////////////////////////////////////////////////////////////
        pass
#end class

class JythonProcessor(Processor):   
    REL_SUCCESS = Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    REL_FAILURE = Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    log = None
    e = E()
    def initialize(self,context):
        self.log = context.logger
    def getRelationships(self):
        return set([self.REL_SUCCESS, self.REL_FAILURE])
    def validate(self,context):
        pass
    def onPropertyModified(self,descriptor, oldValue, newValue):
        pass
    def getPropertyDescriptors(self):
        return []
    def getIdentifier(self):
        return None    
    def onTrigger(self,context, sessionFactory):
        session = sessionFactory.createSession()
        try:
            self.e.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE)
            session.commit()
        except Throwable, t:
            self.log.error('{} failed to process due to {}; rolling back session', [self, t])
            session.rollback(true)
            raise t
#end class

processor = JythonProcessor()

Like the Groovy version, you just need to add your imports to the top of the file, and paste your ExecuteScript Jython code into the executeScript() method, replacing the "pass" line. As always, please let me know how/if this works for you, and if you have any comments, questions, or suggestions.  The script is available as a Gist also.  Cheers!

5 comments:

  1. Hi Matt. Thank you very much for the code, I will try it in different ways and I will give you good feedback. Thanks for your time.

    ReplyDelete
    Replies
    1. Hello good morning Matt. After several adaptations were made to the Jython code, it was inserted into your template. As a result, data can be processed faster and bottlenecks avoided. Really thank you very much for your help, your template is clear and effective. Greetings

      Delete
  2. Hi Matt, thanks for the helpful posts. I got a question. I add a global variable to my python file which counts how many times the functions is used for specific method (simply let's say a counter). When the InvokeScriptedProcessor is stopped, I expect it to write the final value of the counter to a file for me. But it appears always to be empty outside of onTrigger method! when I print it inside onTrigger, it shows the counter, but outside of the class it's always empty. And the point is that I want only the final result that's why I want it when the processor is stopped.
    I tried to add the counter as an attribute of the class, and update it each time by the global variable. but again it appears empty. I don't know what is the good way to return such data from onTrigger when the InvokeScriptedProcessor is stopped?

    ReplyDelete
    Replies
    1. I don't think the script engine keeps track of global variables, so you would want it to be a member of the class that implements the Processor interface (i.e. the one with onTrigger). Note that when the script is reloaded (if any properties change on the main dialog, including the code itself), a new instance of the class will be created, so the member variable holding the count will be re-initialized. If you need to keep track of data in between runs of onTrigger, you can also use the processor's State Management capabilities, check out part 3 of my ExecuteScript Cookbook (the same technique applies to InvokeScriptedProcessor): https://community.hortonworks.com/content/kbentry/77739/executescript-cookbook-part-3.html

      Delete