Showing posts with label invokescriptedprocessor. Show all posts
Showing posts with label invokescriptedprocessor. Show all posts

Wednesday, February 24, 2016

Writing Reusable Scripted Processors in NiFi

This blog has quite a few posts about the various things you can do with the new (as of NiFi 0.5.0) scripting processors. Most are about ExecuteScript and how to use it to do per-flowfile things like replace content, use external modules to add functionality, etc.  However most are specific for the task at hand, and aren't really general-use scripts.

We could use dynamic properties (explained in the Developer's Guide and in an earlier post), as they are passed into ExecuteScript as variables. However the user of the processor would have to know which properties to add and fill in, and there's no good way to get that information to the user (at least with ExecuteScript).

However, InvokeScriptedProcessor lets you provide a scripted implementation of a full Processor instance. This means you can define your own properties and relationships, along with documentation and validation of them.  Your script could provide capabilities that depend on the way the user of the processor configures the processor, without having to interact with the script at all!

I'll illustrate this below, but I think the coolest point is: A template with a single InvokeScriptedProcessor (that contains a working script) can be dragged onto the canvas and basically acts like dragging your custom processor onto the canvas! When the user opens the dialog, they will see the properties/relationships you added, and they will be validated just like the normal ones (script language, body, etc.) that come with the processor.

The scripted processor needs only implement the Processor interface, which in turn extends AbstractConfigurableComponent. A basic Groovy skeleton with a class including a set of overridden interface methods looks like this:

class MyProcessor implements Processor {

    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
      // do stuff
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        return null
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override
    List<PropertyDescriptor> getPropertyDescriptors() { return [] as List }

    @Override
    String getIdentifier() { return 'MyProcessor-InvokeScriptedProcessor' }
}

processor = new MyProcessor()

Note that the class must implement Processor and declare a variable named "processor" that contains an instance of the class. This is the convention required by the InvokeScriptedProcessor.

IMPORTANT: Although you may find in NiFi code that many processors extend either AbstractProcessor or AbstractSessionFactoryProcessor, your script will most likely NOT work if it extends one of these classes. This is due to the validate() method of these classes being declared final, and the basic implementation will expect the set of Supported Property Descriptors to include the ones that come with the InvokeScriptedProcessor (like Script File), but will only use the list that your scripted processor provides. There might be a hack to get around this but even if possible, it's not likely worth it.

Moving on, let's say we want to create a reusable scripted processor that works like GenerateFlowFile but allows the user to provide the content of the flow file as well as the value of its "filename" attribute.  Moreover, maybe the content could include NiFi Expression Language (EL) constructs like ${hostname()}. Since the content may have something like EL statements but the user might not want them evaluated as such, we should let the user decide whether to evaluate the content for EL statements before writing to the flow file.  Lastly, this is a "generate" processor so we only need a "success" relationship; "failure" doesn't really make sense here. Having said that, it will be important to catch all Exceptions that your code can throw; wrap each in a ProcessException and re-throw, so the framework can handle it correctly.

So the list of things to do:

  1. Add a "success" relationship and return it in (in a Set) from getRelationships()
  2. Add a "File Content" property to contain the intended content of the flow file (may include EL)
  3. Add a "Evaluate Expressions in Content" property for the user to indicate whether to evaluate the content for EL
  4. Add an optionally-set "Filename" property to override the default "filename" attribute.
  5. When the processor is triggered, create a flow file, write the content (after possibly evaluating EL), and possibly set the filename attribute

Here is some example Groovy code to do just that:
class GenerateFlowFileWithContent implements Processor {

    def REL_SUCCESS = new Relationship.Builder()
            .name('success')
            .description('The flow file with the specified content and/or filename was successfully transferred')
            .build();

    def CONTENT = new PropertyDescriptor.Builder()
            .name('File Content').description('The content for the generated flow file')
            .required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build()
    
    def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
            .name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
            .required(true).allowableValues('true','false').defaultValue('false').build()
            
    def FILENAME = new PropertyDescriptor.Builder()
            .name('Filename').description('The name of the flow file to be stored in the filename attribute')
            .required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
    
    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
      try {
        def session = sessionFactory.createSession()
        def flowFile = session.create()
        
        def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
        def contentProp = context.getProperty(CONTENT)
        def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
        def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
        
        flowFile = session.write(flowFile, { outStream ->
                outStream.write(content.getBytes("UTF-8"))
            } as OutputStreamCallback)
        
        if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
        // transfer
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
      } catch(e) {
          throw new ProcessException(e)
      }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        switch(name) {
            case 'File Content': return CONTENT
            case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
            case 'Filename': return FILENAME
            default: return null
        }
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override
    List<PropertyDescriptor>> getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }

    @Override
    String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
    
}

processor = new GenerateFlowFileWithContent()

When this is entered into the Script Body of an InvokeScriptedProcessor, with the language set to Groovy and then applied (by clicking Apply on the dialog), then when the dialog is reopened you should see the relationships set to only "success" and the properties added to the config dialog:


At this point you can save the single processor as a template, calling it perhaps GenerateFlowFileWithContent or something.  Now it is a template that is basically reusable as a processor.  Try dragging it onto the canvas and entering some values, then wiring it to some other processor like PutFile (to see if it works):


Once the success relationship has been satisfied, that instance should be good to go:


Hopefully this has illustrated the power and flexibility of InvokeScriptedProcessor, and how it can be used to create reusable processor templates with custom logic, without having to construct and deploy a NAR. The example template is available as a Gist (here); as always I welcome all comments, questions, and suggestions.

Cheers!

Wednesday, February 10, 2016

InvokeScriptedProcessor - Hello World!

In some of my previous posts I outlined some use cases for the ExecuteScript processor in NiFi (starting with 0.5.0). Now let's get to the real powerhouse of the scripting additions, the InvokeScriptedProcessor... processor :)

ExecuteScript is a glorified onTrigger() call where you can work with incoming flow files, create new ones, add attributes, etc. It is meant for the "per-flowfile" paradigm, where the other aspects of the Processor API do not apply.  But perhaps you want a full-fledged Processor local to your cluster and want to avoid the overhead of building a NAR, submitting the code, etc. etc.  With InvokeScriptedProcessor, you can use Javascript, Groovy, Jython, Lua, or JRuby to create a Processor implementation. InvokeScriptedProcessor will delegate methods such as getPropertyDescriptors, getRelationships, onTrigger, etc. to the scripted processor.  This has more power than ExecuteScript because custom properties and relationships can be defined, plus more methods than onTrigger() can be implemented.

One main difference between ExecuteScript and InvokeScriptedProcessor is that REL_SUCCESS and REL_FAILURE are the only two relationships available to ExecuteScript and are passed in automatically. For InvokeScriptedProcessor, all relationships (and all other Processor interface methods) must be defined by the scripted processor, and a variable named "processor" must be defined and point to a valid instance of the scripted processor.

Below is an example of a bare-bones scripted Processor that expects input from a CSV-formatted flow file (coming from the random user generation site https://randomuser.me/, the query is http://api.randomuser.me/0.6/?format=csv&nat=us&results=100. It splits on commas and takes the third and fourth (indexes 2 and 3) values (first and last name, respectively), then outputs the capitalized first name followed by the capitalized last name:


class GroovyProcessor implements Processor {


    def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
    def ProcessorLog log

    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }

    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            def selectedColumns = ''
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        String line
                        final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
                        line = inReader.readLine()
                        String[] header = line?.split(',')
                        selectedColumns = "${header[1]},${header[2]}"                  
                        while (line = inReader.readLine()) {
                            String[] cols = line.split(',')
                            outputStream.write("${cols[2].capitalize()} ${cols[3].capitalize()}\n".getBytes('UTF-8'))
                        }
                    } as StreamCallback)

     flowFile = session.putAttribute(flowFile, "selected.columns", selectedColumns)
     flowFile = session.putAttribute(flowFile, "filename", "split_cols_invoke.txt")
            // transfer
            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
            throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

    List<PropertyDescriptor> getPropertyDescriptors() { return null }

    @Override

    String getIdentifier() { return null }
}

processor = new GroovyProcessor()

Besides the relationship differences, there are a couple of other noteworthy differences between ExecuteScript and InvokeScriptedProcessor (plus the latter's need to implement Processor API methods):

1) ExecuteScript handles the session.commit() for you, but InvokeScriptedProcessor does not.
2) ExecuteScript has a "session" variable, where the InvokeScriptedProcessor's onTrigger() method must call sessionFactory.createSession()

A future post will cover the other Processor API methods, specifically those that add properties and/or relationships. This is meant as an introduction to writing a Processor in a scripting language using InvokeScriptedProcessor.

The template for the above processor (and the associated incoming data) is available as a Gist (here). As always I welcome all comments, questions, and suggestions.

Cheers!