For quick, easy, and small scripting tasks in Apache NiFi, ExecuteScript is often a better choice than InvokeScriptedProcessor, as there is little-to-no boilerplate code, relationships and properties are already defined and supported, and some objects relevant to the NiFi API (such as the ProcessSession, ProcessContext, and ComponentLog) are already bound to the script engine as variables that can readily be used by the script.
However, one tradeoff is performance; in ExecuteScript, the script is evaluated each time onTrigger is executed. With InvokeScriptedProcessor, as long as the script (or any of the InvokeScriptedProcessor properties) is not changed, the scripted Processor instance is maintained by the processor, and its methods are simply invoked when parent methods such as onTrigger() are called by the NiFi framework.
To get the best of both worlds, I have put together an InvokeScriptedProcessor instance that is configured the same way ExecuteScript is. The "success" and "failure" relationships are provided, the API objects are available, and if you simply paste your ExecuteScript code into the same spot in the below script, it will behave like a more performant ExecuteScript instance. The code is as follows:
////////////////////////////////////////////////////////////
// imports go here
////////////////////////////////////////////////////////////
class E{ void executeScript(session, context, log, REL_SUCCESS, REL_FAILURE)
{
////////////////////////////////////////////////////////////
// your code goes here
////////////////////////////////////////////////////////////
}
}
class GroovyProcessor implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
def ComponentLog log
def e = new E()
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { null }
String getIdentifier() { null }
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session = sessionFactory.createSession()
try {
e.executeScript(session, context, log, REL_SUCCESS, REL_FAILURE)
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}}}
processor = new GroovyProcessor()
The boilerplate Processor implementation is at the bottom, and I've left comment blocks where your imports and code go. With some simple cut-and-paste, you should be able to have a pre-evaluated Processor instance that will run your ExecuteScript code faster than before!
If you give this a try, please let me know how/if it works for you. I am always open to suggestions, improvements, comments, and questions. Cheers!
Matt,
ReplyDeleteI tried to write a custom groovy script to filter CSV with a set of conditions and it worked like magic, this gave me more flexibility on using object oriented approach and get myself familiar with what methods/classes to consider if I was to write a custom processor. However the only problem I observed was when I'm trying to get a list of fields from processor as dynamic values and the script is not recognizing them. May be I am not doing it right and you can comment on it please. Thanks for the fantastic blog and please keep writing!!
Cheers!
Karthik
Are you using ExecuteScript or InvokeScriptedProcessor? If the former, the dynamic properties are bound to the script as variables, whose names are the keys and whose values are PropertyValue objects (see my HCC article: https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html). If you are using InvokeScriptedProcessor, you can get to the dynamic properties via the "context" object in onTrigger(). Here's an example that logs each dynamic property added to the InvokeScriptedProcessor configuration:
Deletecontext.properties.findAll {k,v -> k.dynamic}.each {k,v -> log.info("found dynamic property $v")}
Hi Matt, I'm looking to do the same with a Python Script. Do you have an example using Python?
ReplyDeleteSame question for Javascript
ReplyDeleteHello, how can I reuse this template in NiFi? I can see InvokeScriptedProcessor available but it has no relationships. Where should I reuse your processor? Is it a customized one?
ReplyDeleteThanks,
Tiago
It has no relationships until you paste in the above code and hit the "Apply" button. When you open the processor dialog again, the relationships you define in the script (in my template, "success" and "failure") will be present.
DeleteHey Matt,
ReplyDeleteI have a requirement in my project like (API)Fetch data and load into RDBMS databases.How I can created a flow in Nifi ?
Thanks,
Manikandan K
nice article. Thanks for sharing the post...~
ReplyDeleteDataSatge Interview Questions
Oracle DataGuard Interview Questions
Wow it is really wonderful and awesome.
ReplyDeleteExchange Server Training
Go Language Training
Google Cloud Platform Training
Hadoop Training
Hyperion Training
This post is much helpful for us.
ReplyDeleteAngular JS Training
App V Training
Appian BPM Training
Appium Training
Application Packaging Training
Hi.., excellent websites you possess at this time there.
ReplyDeleteMachine Learning Training
Magento Training
MicroServices Training
Microsoft Azure Training
SCCM 2016 Training
MSBI Training
Mule ESB Training
MySql Training
OBIEE Training
Office 365 Training
nice post, 2019 now and seems it is no longer compatible
ReplyDeleteHi
ReplyDeleteMany thanks for the template. I tried to copy in the function mburgess wrote from https://community.cloudera.com/t5/Support-Questions/NiFi-convert-everything-in-json-to-attributes-not-one-by-one/td-p/192812 to convert all json fields into properties.
But the InvokeScriptProcessor start failing with the import org.apache.commons.io.IOUtils
Can you help here, please :-)
Hi, I am trying to use InvokeScriptedProcessor in Nifi 1.14.0 to implement custom logic in Jython. It fails with error sys.path.append('/opt/nifi/nifi-current/ctct-nifi-artifacts/site-packages')import json
ReplyDelete^
SyntaxError: no viable alternative at input 'import'
↳ causes: javax.script.ScriptException: SyntaxError: no viable alternative at input 'import' at line number 3 at column number 75
↳ causes: org.apache.nifi.processor.exception.ProcessException: Could not instantiate script engines
My module directory field has path to Python2.7 site-packages and it still does not solve the problem.
Can you please help me out?
Hi, I was facing the same problem. I added a new line before the import statement, for me it was the first line so the now my code starts from line 2, that seemed to work.
DeleteThanks for the very informative post! I believe asking questions works more effectively. It makes your readers involved at the very start.
ReplyDeleteApache Spark Certification Online Training from Hyderabad
RPA Training Course Online
Best SAP Spartacus Online Certification Training India
Linux Admin Online Training
IBM Cast Iron Online Live Classes