Thursday, February 1, 2018

InvokeScriptedProcessor template revisited (with Javascript)

I recently got a request on Google+ for a Javascript/Nashorn example of InvokeScriptedProcessor, and since I've done articles on an InvokeScriptedProcessor template (to port from ExecuteScript) for Groovy and Jython, I thought I'd do the same for Javascript, since the template also illustrates the methods you'd have to override in order to write your own InvokeScriptedProcessor that does more powerful/flexible things.

Javascript/Nashorn is a slightly peculiar animal in terms of its ScriptEngine. In Groovy and Jython you need to create a subclass of Processor, implement the requisite methods, and assign an instance of the subclass to the variable "processor". The same is true of Javascript, except in order to invoke methods on the "processor" object, it has to be an instance of ScriptObject (an internal Nashorn class) or ScriptObjectMirror (a class of the Nashorn scripting API). In our context, the script body/file of InvokeScriptedProcessor is the thing that gets evaluated and cast as a ScriptObjectMirror, which means we need a slightly different approach than just creating a subclass and setting "processor" to an instance of it.  Instead the script itself has to be able to be cast as a Processor, so it can be a ScriptObjectMirror and Processor at the same time.

To that end, we declare the Processor interface methods (and its inherited interface methods) as functions on the main script, and then we set the "processor" variable to "this". Another difference from the other scripting language examples is that you need variable access to the various Java classes (both in Java proper and the NiFi API) before you can instantiate them. So in the "imports" section you will see a number of Java.type() calls, to get JS references to the Java classes.  You will have to do the same if you reference other Java classes in your executeScript() body.

Speaking of which, I tried to keep the same approach to giving an obvious place to paste your ExecuteScript code into the InvokeScriptedProcessor template, to make porting from ExecuteScript to InvokeScriptedProcessor easier. The template follows:

// "imports" go here

var Processor = Java.type("org.apache.nifi.processor.Processor");
var Relationship =  Java.type("org.apache.nifi.processor.Relationship");
var HashSet =  Java.type("java.util.HashSet");
var log = null;
var REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build();
var REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build();

function executeScript(session, context, log, REL_SUCCESS, REL_FAILURE) {
   // your code goes here

function initialize(context) { log = context.logger; }
function getRelationships() {
    var r = new HashSet(); 
    return r; 
function validate(context) { return null; }
function getPropertyDescriptor(name) { return null; }
function onPropertyModified(descriptor, oldValue, newValue) { return null; }
function getPropertyDescriptors() { return null; }
function getIdentifier() { return null; }
function onTrigger(context, sessionFactory) {
        var session = sessionFactory.createSession();
        try {
            executeScript(session, context, log, REL_SUCCESS, REL_FAILURE);
        } catch (t) {
            log.error("{} failed to process due to {}; rolling back session",[this, t], "java.lang.Object[]"));
            throw t;

processor = this;

As always, please let me know how/if this works for you, and of course comments, questions, and suggestions are welcome.  Cheers!

No comments:

Post a Comment