Tuesday, November 7, 2017

InvokeScriptedProcessor template revisited (with Jython)

In a previous post, I provided a template in Groovy that would allow NiFi users to port their ExecuteScript Groovy scripts into the faster InvokeScriptedProcessor (ISP) processor. ISP is faster than ExecuteScript because the script is only reloaded when the code or other config changes, versus ExecuteScript which evaluates the script each time the processor is invoked.

Since that post, I've gotten a couple of requests (such as this one) for an ISP template written in Jython, so users that have ExecuteScript processors using Jython scripts can benefit from the ISP performance gains. Ask and ye shall receive :) The following Jython script is meant to be pasted into an InvokeScriptedProcessor's Script Body property, and there is a comment indicating where to add imports and the ExecuteScript code:

#////////////////////////////////////////////////////////////
#// imports go here
#////////////////////////////////////////////////////////////
from org.apache.nifi.processor import Processor,Relationship
from java.lang import Throwable

class E():
    def __init__(self):
        pass
    def executeScript(self,session, context, log, REL_SUCCESS, REL_FAILURE):
        #////////////////////////////////////////////////////////////
        #// Replace 'pass' with your code
        #////////////////////////////////////////////////////////////
        pass
#end class

class JythonProcessor(Processor):   
    REL_SUCCESS = Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    REL_FAILURE = Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    log = None
    e = E()
    def initialize(self,context):
        self.log = context.logger
    def getRelationships(self):
        return set([self.REL_SUCCESS, self.REL_FAILURE])
    def validate(self,context):
        pass
    def onPropertyModified(self,descriptor, oldValue, newValue):
        pass
    def getPropertyDescriptors(self):
        return []
    def getIdentifier(self):
        return None    
    def onTrigger(self,context, sessionFactory):
        session = sessionFactory.createSession()
        try:
            self.e.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE)
            session.commit()
        except Throwable, t:
            self.log.error('{} failed to process due to {}; rolling back session', [self, t])
            session.rollback(true)
            raise t
#end class

processor = JythonProcessor()

Like the Groovy version, you just need to add your imports to the top of the file, and paste your ExecuteScript Jython code into the executeScript() method, replacing the "pass" line. As always, please let me know how/if this works for you, and if you have any comments, questions, or suggestions.  The script is available as a Gist also.  Cheers!

9 comments:

  1. Hi Matt. Thank you very much for the code, I will try it in different ways and I will give you good feedback. Thanks for your time.

    ReplyDelete
    Replies
    1. Hello good morning Matt. After several adaptations were made to the Jython code, it was inserted into your template. As a result, data can be processed faster and bottlenecks avoided. Really thank you very much for your help, your template is clear and effective. Greetings

      Delete
  2. Hi Matt, thanks for the helpful posts. I got a question. I add a global variable to my python file which counts how many times the functions is used for specific method (simply let's say a counter). When the InvokeScriptedProcessor is stopped, I expect it to write the final value of the counter to a file for me. But it appears always to be empty outside of onTrigger method! when I print it inside onTrigger, it shows the counter, but outside of the class it's always empty. And the point is that I want only the final result that's why I want it when the processor is stopped.
    I tried to add the counter as an attribute of the class, and update it each time by the global variable. but again it appears empty. I don't know what is the good way to return such data from onTrigger when the InvokeScriptedProcessor is stopped?

    ReplyDelete
    Replies
    1. I don't think the script engine keeps track of global variables, so you would want it to be a member of the class that implements the Processor interface (i.e. the one with onTrigger). Note that when the script is reloaded (if any properties change on the main dialog, including the code itself), a new instance of the class will be created, so the member variable holding the count will be re-initialized. If you need to keep track of data in between runs of onTrigger, you can also use the processor's State Management capabilities, check out part 3 of my ExecuteScript Cookbook (the same technique applies to InvokeScriptedProcessor): https://community.hortonworks.com/content/kbentry/77739/executescript-cookbook-part-3.html

      Delete
  3. Hi Matt
    I'm curious, is there a way to access dynamic properties and their values from ISP just like you did in "ExecuteScript Cookbook part 3"?
    I tried both, calling my_property.getValue() from executeScript method and outside it... and always failed.
    I can't use ExecuteScript processor since I want to manage more than two relationships.

    Thank you for articles, I really love them

    ReplyDelete
    Replies
    1. After making some research, I sorted the issue out this way:
      context.getProperty('my_property').getValue() # calling from executeScript() section using passed context

      Anyway, I'd like to see how to handle properties in better way. Say if we add them with PropertyDescriptor.Builder() and want to make some validations against them.

      There is already a great post here (for Groovy users): http://funnifi.blogspot.com/2016/02/writing-reusable-scripted-processors-in.html

      Delete
  4. I hope that this is the right place for my question, so here it goes:

    I want to consume data from a running process that exposes a soap service interface. The standard processors in NiFi do not support soap. Given the available skills in our organization, I was looking at Python and the module Zeep, that can act as a soap client.

    First question is if a soap client for NiFi exists. If this is the case, then I can use that.

    Second question is how to use a Python module in the ScriptedProcessor. I have not seen instructions on how to make modules available for import in Python, and I don't know if pure Python modules can be used in the ScriptedProcessor.

    Last question for now is about the difference between de InvokeScriptedProcessor and the ExecuteScript processor.

    ReplyDelete
  5. The Best VR Sports Games For The HTC Vive, Oculus, and
    Best nipple piercing jewelry titanium VR Sports titanium joes Games For The HTC Vive, Oculus, and Quest 2 The best titanium flat irons VR sports snow peak titanium games titanium alloy nier replicant for the HTC Vive, Oculus, and Quest 2.

    ReplyDelete