In a previous post, I provided a template in Groovy that would allow NiFi users to port their ExecuteScript Groovy scripts into the faster InvokeScriptedProcessor (ISP) processor. ISP is faster than ExecuteScript because the script is only reloaded when the code or other config changes, versus ExecuteScript which evaluates the script each time the processor is invoked.
Since that post, I've gotten a couple of requests (such as this one) for an ISP template written in Jython, so users that have ExecuteScript processors using Jython scripts can benefit from the ISP performance gains. Ask and ye shall receive :) The following Jython script is meant to be pasted into an InvokeScriptedProcessor's Script Body property, and there is a comment indicating where to add imports and the ExecuteScript code:
#////////////////////////////////////////////////////////////
#// imports go here
#////////////////////////////////////////////////////////////
from org.apache.nifi.processor import Processor,Relationship
from java.lang import Throwable
class E():
def __init__(self):
pass
def executeScript(self,session, context, log, REL_SUCCESS, REL_FAILURE):
#////////////////////////////////////////////////////////////
#// Replace 'pass' with your code
#////////////////////////////////////////////////////////////
pass
#end class
class JythonProcessor(Processor):
REL_SUCCESS = Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
REL_FAILURE = Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
log = None
e = E()
def initialize(self,context):
self.log = context.logger
def getRelationships(self):
return set([self.REL_SUCCESS, self.REL_FAILURE])
def validate(self,context):
pass
def onPropertyModified(self,descriptor, oldValue, newValue):
pass
def getPropertyDescriptors(self):
return []
def getIdentifier(self):
return None
def onTrigger(self,context, sessionFactory):
session = sessionFactory.createSession()
try:
self.e.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE)
session.commit()
except Throwable, t:
self.log.error('{} failed to process due to {}; rolling back session', [self, t])
session.rollback(true)
raise t
#end class
processor = JythonProcessor()
Like the Groovy version, you just need to add your imports to the top of the file, and paste your ExecuteScript Jython code into the executeScript() method, replacing the "pass" line. As always, please let me know how/if this works for you, and if you have any comments, questions, or suggestions. The script is available as a Gist also. Cheers!
Hi Matt. Thank you very much for the code, I will try it in different ways and I will give you good feedback. Thanks for your time.
ReplyDeleteHello good morning Matt. After several adaptations were made to the Jython code, it was inserted into your template. As a result, data can be processed faster and bottlenecks avoided. Really thank you very much for your help, your template is clear and effective. Greetings
Deleteawesome, thanks Matt!
ReplyDeleteHi Matt, thanks for the helpful posts. I got a question. I add a global variable to my python file which counts how many times the functions is used for specific method (simply let's say a counter). When the InvokeScriptedProcessor is stopped, I expect it to write the final value of the counter to a file for me. But it appears always to be empty outside of onTrigger method! when I print it inside onTrigger, it shows the counter, but outside of the class it's always empty. And the point is that I want only the final result that's why I want it when the processor is stopped.
ReplyDeleteI tried to add the counter as an attribute of the class, and update it each time by the global variable. but again it appears empty. I don't know what is the good way to return such data from onTrigger when the InvokeScriptedProcessor is stopped?
I don't think the script engine keeps track of global variables, so you would want it to be a member of the class that implements the Processor interface (i.e. the one with onTrigger). Note that when the script is reloaded (if any properties change on the main dialog, including the code itself), a new instance of the class will be created, so the member variable holding the count will be re-initialized. If you need to keep track of data in between runs of onTrigger, you can also use the processor's State Management capabilities, check out part 3 of my ExecuteScript Cookbook (the same technique applies to InvokeScriptedProcessor): https://community.hortonworks.com/content/kbentry/77739/executescript-cookbook-part-3.html
DeleteHi Matt
ReplyDeleteI'm curious, is there a way to access dynamic properties and their values from ISP just like you did in "ExecuteScript Cookbook part 3"?
I tried both, calling my_property.getValue() from executeScript method and outside it... and always failed.
I can't use ExecuteScript processor since I want to manage more than two relationships.
Thank you for articles, I really love them
After making some research, I sorted the issue out this way:
Deletecontext.getProperty('my_property').getValue() # calling from executeScript() section using passed context
Anyway, I'd like to see how to handle properties in better way. Say if we add them with PropertyDescriptor.Builder() and want to make some validations against them.
There is already a great post here (for Groovy users): http://funnifi.blogspot.com/2016/02/writing-reusable-scripted-processors-in.html
I hope that this is the right place for my question, so here it goes:
ReplyDeleteI want to consume data from a running process that exposes a soap service interface. The standard processors in NiFi do not support soap. Given the available skills in our organization, I was looking at Python and the module Zeep, that can act as a soap client.
First question is if a soap client for NiFi exists. If this is the case, then I can use that.
Second question is how to use a Python module in the ScriptedProcessor. I have not seen instructions on how to make modules available for import in Python, and I don't know if pure Python modules can be used in the ScriptedProcessor.
Last question for now is about the difference between de InvokeScriptedProcessor and the ExecuteScript processor.
The Best VR Sports Games For The HTC Vive, Oculus, and
ReplyDeleteBest nipple piercing jewelry titanium VR Sports titanium joes Games For The HTC Vive, Oculus, and Quest 2 The best titanium flat irons VR sports snow peak titanium games titanium alloy nier replicant for the HTC Vive, Oculus, and Quest 2.