Showing posts with label script. Show all posts
Showing posts with label script. Show all posts

Wednesday, February 10, 2016

InvokeScriptedProcessor - Hello World!

In some of my previous posts I outlined some use cases for the ExecuteScript processor in NiFi (starting with 0.5.0). Now let's get to the real powerhouse of the scripting additions, the InvokeScriptedProcessor... processor :)

ExecuteScript is a glorified onTrigger() call where you can work with incoming flow files, create new ones, add attributes, etc. It is meant for the "per-flowfile" paradigm, where the other aspects of the Processor API do not apply.  But perhaps you want a full-fledged Processor local to your cluster and want to avoid the overhead of building a NAR, submitting the code, etc. etc.  With InvokeScriptedProcessor, you can use Javascript, Groovy, Jython, Lua, or JRuby to create a Processor implementation. InvokeScriptedProcessor will delegate methods such as getPropertyDescriptors, getRelationships, onTrigger, etc. to the scripted processor.  This has more power than ExecuteScript because custom properties and relationships can be defined, plus more methods than onTrigger() can be implemented.

One main difference between ExecuteScript and InvokeScriptedProcessor is that REL_SUCCESS and REL_FAILURE are the only two relationships available to ExecuteScript and are passed in automatically. For InvokeScriptedProcessor, all relationships (and all other Processor interface methods) must be defined by the scripted processor, and a variable named "processor" must be defined and point to a valid instance of the scripted processor.

Below is an example of a bare-bones scripted Processor that expects input from a CSV-formatted flow file (coming from the random user generation site https://randomuser.me/, the query is http://api.randomuser.me/0.6/?format=csv&nat=us&results=100. It splits on commas and takes the third and fourth (indexes 2 and 3) values (first and last name, respectively), then outputs the capitalized first name followed by the capitalized last name:


class GroovyProcessor implements Processor {


    def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
    def ProcessorLog log

    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }

    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            def selectedColumns = ''
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        String line
                        final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
                        line = inReader.readLine()
                        String[] header = line?.split(',')
                        selectedColumns = "${header[1]},${header[2]}"                  
                        while (line = inReader.readLine()) {
                            String[] cols = line.split(',')
                            outputStream.write("${cols[2].capitalize()} ${cols[3].capitalize()}\n".getBytes('UTF-8'))
                        }
                    } as StreamCallback)

     flowFile = session.putAttribute(flowFile, "selected.columns", selectedColumns)
     flowFile = session.putAttribute(flowFile, "filename", "split_cols_invoke.txt")
            // transfer
            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
            throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

    List<PropertyDescriptor> getPropertyDescriptors() { return null }

    @Override

    String getIdentifier() { return null }
}

processor = new GroovyProcessor()

Besides the relationship differences, there are a couple of other noteworthy differences between ExecuteScript and InvokeScriptedProcessor (plus the latter's need to implement Processor API methods):

1) ExecuteScript handles the session.commit() for you, but InvokeScriptedProcessor does not.
2) ExecuteScript has a "session" variable, where the InvokeScriptedProcessor's onTrigger() method must call sessionFactory.createSession()

A future post will cover the other Processor API methods, specifically those that add properties and/or relationships. This is meant as an introduction to writing a Processor in a scripting language using InvokeScriptedProcessor.

The template for the above processor (and the associated incoming data) is available as a Gist (here). As always I welcome all comments, questions, and suggestions.

Cheers!