Wednesday, February 24, 2016

Writing Reusable Scripted Processors in NiFi

This blog has quite a few posts about the various things you can do with the new (as of NiFi 0.5.0) scripting processors. Most are about ExecuteScript and how to use it to do per-flowfile things like replace content, use external modules to add functionality, etc.  However most are specific for the task at hand, and aren't really general-use scripts.

We could use dynamic properties (explained in the Developer's Guide and in an earlier post), as they are passed into ExecuteScript as variables. However the user of the processor would have to know which properties to add and fill in, and there's no good way to get that information to the user (at least with ExecuteScript).

However, InvokeScriptedProcessor lets you provide a scripted implementation of a full Processor instance. This means you can define your own properties and relationships, along with documentation and validation of them.  Your script could provide capabilities that depend on the way the user of the processor configures the processor, without having to interact with the script at all!

I'll illustrate this below, but I think the coolest point is: A template with a single InvokeScriptedProcessor (that contains a working script) can be dragged onto the canvas and basically acts like dragging your custom processor onto the canvas! When the user opens the dialog, they will see the properties/relationships you added, and they will be validated just like the normal ones (script language, body, etc.) that come with the processor.

The scripted processor needs only implement the Processor interface, which in turn extends AbstractConfigurableComponent. A basic Groovy skeleton with a class including a set of overridden interface methods looks like this:

class MyProcessor implements Processor {

    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
      // do stuff
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        return null
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override
    List<PropertyDescriptor> getPropertyDescriptors() { return [] as List }

    @Override
    String getIdentifier() { return 'MyProcessor-InvokeScriptedProcessor' }
}

processor = new MyProcessor()

Note that the class must implement Processor and declare a variable named "processor" that contains an instance of the class. This is the convention required by the InvokeScriptedProcessor.

IMPORTANT: Although you may find in NiFi code that many processors extend either AbstractProcessor or AbstractSessionFactoryProcessor, your script will most likely NOT work if it extends one of these classes. This is due to the validate() method of these classes being declared final, and the basic implementation will expect the set of Supported Property Descriptors to include the ones that come with the InvokeScriptedProcessor (like Script File), but will only use the list that your scripted processor provides. There might be a hack to get around this but even if possible, it's not likely worth it.

Moving on, let's say we want to create a reusable scripted processor that works like GenerateFlowFile but allows the user to provide the content of the flow file as well as the value of its "filename" attribute.  Moreover, maybe the content could include NiFi Expression Language (EL) constructs like ${hostname()}. Since the content may have something like EL statements but the user might not want them evaluated as such, we should let the user decide whether to evaluate the content for EL statements before writing to the flow file.  Lastly, this is a "generate" processor so we only need a "success" relationship; "failure" doesn't really make sense here. Having said that, it will be important to catch all Exceptions that your code can throw; wrap each in a ProcessException and re-throw, so the framework can handle it correctly.

So the list of things to do:

  1. Add a "success" relationship and return it in (in a Set) from getRelationships()
  2. Add a "File Content" property to contain the intended content of the flow file (may include EL)
  3. Add a "Evaluate Expressions in Content" property for the user to indicate whether to evaluate the content for EL
  4. Add an optionally-set "Filename" property to override the default "filename" attribute.
  5. When the processor is triggered, create a flow file, write the content (after possibly evaluating EL), and possibly set the filename attribute

Here is some example Groovy code to do just that:
class GenerateFlowFileWithContent implements Processor {

    def REL_SUCCESS = new Relationship.Builder()
            .name('success')
            .description('The flow file with the specified content and/or filename was successfully transferred')
            .build();

    def CONTENT = new PropertyDescriptor.Builder()
            .name('File Content').description('The content for the generated flow file')
            .required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build()
    
    def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
            .name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
            .required(true).allowableValues('true','false').defaultValue('false').build()
            
    def FILENAME = new PropertyDescriptor.Builder()
            .name('Filename').description('The name of the flow file to be stored in the filename attribute')
            .required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
    
    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
      try {
        def session = sessionFactory.createSession()
        def flowFile = session.create()
        
        def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
        def contentProp = context.getProperty(CONTENT)
        def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
        def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
        
        flowFile = session.write(flowFile, { outStream ->
                outStream.write(content.getBytes("UTF-8"))
            } as OutputStreamCallback)
        
        if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
        // transfer
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
      } catch(e) {
          throw new ProcessException(e)
      }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        switch(name) {
            case 'File Content': return CONTENT
            case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
            case 'Filename': return FILENAME
            default: return null
        }
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override
    List<PropertyDescriptor>> getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }

    @Override
    String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
    
}

processor = new GenerateFlowFileWithContent()

When this is entered into the Script Body of an InvokeScriptedProcessor, with the language set to Groovy and then applied (by clicking Apply on the dialog), then when the dialog is reopened you should see the relationships set to only "success" and the properties added to the config dialog:


At this point you can save the single processor as a template, calling it perhaps GenerateFlowFileWithContent or something.  Now it is a template that is basically reusable as a processor.  Try dragging it onto the canvas and entering some values, then wiring it to some other processor like PutFile (to see if it works):


Once the success relationship has been satisfied, that instance should be good to go:


Hopefully this has illustrated the power and flexibility of InvokeScriptedProcessor, and how it can be used to create reusable processor templates with custom logic, without having to construct and deploy a NAR. The example template is available as a Gist (here); as always I welcome all comments, questions, and suggestions.

Cheers!

Tuesday, February 23, 2016

ExecuteScript Explained - Split fields and NiFi API with Groovy

There was a question on Twitter about being able to split fields in a flow file based on a delimiter, and selecting the desired columns. There are a few ways to do this in NiFi, but I thought I'd illustrate how to do it using the ExecuteScript processor (new in NiFi 0.5.0).

The approach from the NiFi side is very similar to my previous post on replacing flow file content:
def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   // Read incoming flow file content with inputStream
   // ... other stuff...
   // Write outgoing flow file content with OutputStream
} as StreamCallback)
Before we get to the "split text on delimiter" part, I'll explain a little bit more about what's going on above in terms of the NiFi API and Groovy.

The script is evaluated when the ExecuteScript processor is triggered. The conditions under which the processor may be triggered are listed in the Developer's Guide here.  The "session" object is a ProcessSession instance, and the get() call will return a flow file if it is available. It is possible that get() will return null. This can happen if there are no incoming connections and the processor has been scheduled to run. Also it can happen if there were a flowfile available in the queue but another task has already claimed it after this task has been triggered.  In our example, we only want to work on existing flowfiles, so we perform a session.get() and return from the script if no flow file was available.

The meat of the script is the session.write() call. It is based on one of the ProcessSession API's write methods:
FlowFile write(FlowFile source, StreamCallback writer) 
We use this to pass the incoming flow file in, along with a callback to overwrite the content. It returns a FlowFile because FlowFile objects are immutable, so the reference returned by write() is a reference to the latest version of that flowfile (with the content updated). The latest reference to a modified flowfile is the one that is expected to be transferred. You can see this in the skeleton above:
flowFile = session.write(flowFile, ...
The framework will invoke a method on the StreamCallback, passing in an InputStream (associated with the incoming flowfile's content), and an OutputStream (where you write the new content). This is StreamCallback's single method:
void process(InputStream in, OutputStream out) throws IOException
This is where Groovy's closure coercion feature really shines. Instead of implementing a class (or anonymous class) and declaring that method:
def callback = new StreamCallback() {
  void process(InputStream in, OutputStream out) throws IOException {
     doStuff()
  }
}
If the interface has a single method, you can create a closure and use the "as" keyword to coerce it into the single method of the interface, like so:
def callback = { doStuff() } as StreamCallback
In the skeleton above, I didn't create a variable to hold the callback, I just passed it into the write() method.  As far as boilerplate code goes, the above skeleton is not too bad :)

The example I'll use (based on the Tweet I saw) has a flowfile with the following contents:
a1|b1|c1|d1
a2|b2|c2|d2
a3|b3|c3|d3
The desired output is the middle two columns, delimited by a space instead of the bar:
b1 c1
b2 c2
b3 c3
Here's the script I ended up with:
import java.nio.charset.StandardCharsets

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   inputStream.eachLine { line ->
   a = line.tokenize('|')
   outputStream.write("${a[1]} ${a[2]}\n".toString().getBytes(StandardCharsets.UTF_8))
   }
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)
Notice there's no error handling, stay tuned for most posts :)  Here's a screenshot for a test flow that puts the sample data in and writes the expected output to a file:



The test flow template is available as a Gist (here). I hope this was helpful, as always I welcome all comments, questions, and suggestions.

Cheers!

Friday, February 19, 2016

ExecuteScript - Extract text & metadata from PDF

This post is about using Apache NiFi, its ExecuteScript processor, and Apache PDFBox to extract text and metadata from PDF files. It is similar to a previous post of mine, using Module Path to include JARs. But this is a good use case as well, so I thought I'd write a bit about it. This one will be short and sweet, but the aforementioned post has more details :)

In my example, I'm using the GetFile processor to find all PDFs in a directory. They are sent to an ExecuteScript processor, which uses PDFBox and PDFTextStripper (and other classes) to extract the text into the flowfile content, and adds metadata as attributes. The resulting script is here:

import org.apache.pdfbox.pdmodel.*
import org.apache.pdfbox.util.*

def flowFile = session.get()
if(!flowFile) return

def s = new PDFTextStripper()
def doc, info

flowFile = session.write(flowFile, {inputStream, outputStream ->
 doc = PDDocument.load(inputStream)
 info = doc.getDocumentInformation()
        s.writeText(doc, new OutputStreamWriter(outputStream))
    } as StreamCallback
)
flowFile = session.putAttribute(flowFile, 'pdf.page.count', "${doc.getNumberOfPages()}")
flowFile = session.putAttribute(flowFile, 'pdf.title', "${info.getTitle()}" )
flowFile = session.putAttribute(flowFile, 'pdf.author',"${info.getAuthor()}" );
flowFile = session.putAttribute(flowFile, 'pdf.subject', "${info.getSubject()}" );
flowFile = session.putAttribute(flowFile, 'pdf.keywords', "${info.getKeywords()}" );
flowFile = session.putAttribute(flowFile, 'pdf.creator', "${info.getCreator()}" );
flowFile = session.putAttribute(flowFile, 'pdf.producer', "${info.getProducer()}" );
flowFile = session.putAttribute(flowFile, 'pdf.date.creation', "${info.getCreationDate()}" );
flowFile = session.putAttribute(flowFile, 'pdf.date.modified', "${info.getModificationDate()}");
flowFile = session.putAttribute(flowFile, 'pdf.trapped', "${info.getTrapped()}" );   
session.transfer(flowFile, REL_SUCCESS)


I then put the file's text contents out (using PutFile) and also logged the metadata attributes. The flow looks like this:

The template is available as a Gist (here). Please let me know if you find it useful, and/or if you have comments, questions, or suggestions. Cheers!

Thursday, February 11, 2016

ExecuteScript - JSON-to-JSON conversion

Given the cool UI and data provenance features of NiFi, performing complex JSON-to-JSON conversion should be really simple, and once NIFI-361 is implemented it will be.  Fortunately, in the meantime, some of this can be achieved with the scripting processors (available as of Apache NiFi 0.5.0). As an example, I will present an ExecuteScript processor that performs the same conversion as the demo at http://jolt-demo.appspot.com/, with an extra metric added to really show the JSON-to-JSON transformation.

This is not at all a jab against JOLT; to the contrary, NIFI-361 will likely use JOLT as it is a powerful transformation DSL. This is more about the spirit of the ExecuteScript processor, as it is a great way to rapidly enable new features and/or data transformations.

I choose Groovy for the scripting language for ExecuteScript for two reasons: one, I am most familiar with it :) and two, you can do clean JSON-to-JSON conversions with JsonSlurper and JsonBuilder.

To emulate the jolt-demo, I start with a flow file containing the following JSON:

{
   "rating": {
      "primary": {
         "value": 3
      },
      "quality": {
         "value": 3
      },
      "metric": {
         "value": 6
      }
   }
}

Using the jolt-demo transform, the following should be output:

{
    "Range": 5,
    "Rating": "3",
    "SecondaryRatings": {
        "metric": {
            "Id": "metric",
            "Range": 5,
            "Value": 6
        },
        "quality": {
            "Id": "quality",
            "Range": 5,
            "Value": 3
        }
    }
}

Using the same rules (translated into JsonBuilder format), the script is:

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new groovy.json.JsonSlurper()

flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        def builder = new groovy.json.JsonBuilder()
        builder.call {
            'Range' 5
            'Rating' "${obj.rating.primary.value}"
            'SecondaryRatings' {
                obj.rating.findAll {it.key != "primary"}.each {k,v ->
                    "$k" {
                         'Id' "$k"
                         'Range' 5
                         'Value' v.value
                    }
                }
            }
        }
        outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)


The template is available as a Gist (here), please let me know if you have any questions, comments, or suggestions for make the scripting processors even better!

Cheers!

Wednesday, February 10, 2016

InvokeScriptedProcessor - Hello World!

In some of my previous posts I outlined some use cases for the ExecuteScript processor in NiFi (starting with 0.5.0). Now let's get to the real powerhouse of the scripting additions, the InvokeScriptedProcessor... processor :)

ExecuteScript is a glorified onTrigger() call where you can work with incoming flow files, create new ones, add attributes, etc. It is meant for the "per-flowfile" paradigm, where the other aspects of the Processor API do not apply.  But perhaps you want a full-fledged Processor local to your cluster and want to avoid the overhead of building a NAR, submitting the code, etc. etc.  With InvokeScriptedProcessor, you can use Javascript, Groovy, Jython, Lua, or JRuby to create a Processor implementation. InvokeScriptedProcessor will delegate methods such as getPropertyDescriptors, getRelationships, onTrigger, etc. to the scripted processor.  This has more power than ExecuteScript because custom properties and relationships can be defined, plus more methods than onTrigger() can be implemented.

One main difference between ExecuteScript and InvokeScriptedProcessor is that REL_SUCCESS and REL_FAILURE are the only two relationships available to ExecuteScript and are passed in automatically. For InvokeScriptedProcessor, all relationships (and all other Processor interface methods) must be defined by the scripted processor, and a variable named "processor" must be defined and point to a valid instance of the scripted processor.

Below is an example of a bare-bones scripted Processor that expects input from a CSV-formatted flow file (coming from the random user generation site https://randomuser.me/, the query is http://api.randomuser.me/0.6/?format=csv&nat=us&results=100. It splits on commas and takes the third and fourth (indexes 2 and 3) values (first and last name, respectively), then outputs the capitalized first name followed by the capitalized last name:


class GroovyProcessor implements Processor {


    def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
    def ProcessorLog log

    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }

    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            def selectedColumns = ''
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        String line
                        final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
                        line = inReader.readLine()
                        String[] header = line?.split(',')
                        selectedColumns = "${header[1]},${header[2]}"                  
                        while (line = inReader.readLine()) {
                            String[] cols = line.split(',')
                            outputStream.write("${cols[2].capitalize()} ${cols[3].capitalize()}\n".getBytes('UTF-8'))
                        }
                    } as StreamCallback)

     flowFile = session.putAttribute(flowFile, "selected.columns", selectedColumns)
     flowFile = session.putAttribute(flowFile, "filename", "split_cols_invoke.txt")
            // transfer
            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
            throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

    List<PropertyDescriptor> getPropertyDescriptors() { return null }

    @Override

    String getIdentifier() { return null }
}

processor = new GroovyProcessor()

Besides the relationship differences, there are a couple of other noteworthy differences between ExecuteScript and InvokeScriptedProcessor (plus the latter's need to implement Processor API methods):

1) ExecuteScript handles the session.commit() for you, but InvokeScriptedProcessor does not.
2) ExecuteScript has a "session" variable, where the InvokeScriptedProcessor's onTrigger() method must call sessionFactory.createSession()

A future post will cover the other Processor API methods, specifically those that add properties and/or relationships. This is meant as an introduction to writing a Processor in a scripting language using InvokeScriptedProcessor.

The template for the above processor (and the associated incoming data) is available as a Gist (here). As always I welcome all comments, questions, and suggestions.

Cheers!

ExecuteScript - Using Modules

This is the third post in a series of blogs about the ExecuteScript processor (new as of Apache NiFi 0.5.0). The first two dealt with such concepts as sessions, attributes, and replacing content in flow files.  This post is about using the "Module Directory" property to bring in additional dependencies not bundled with the ExecuteScript NiFi ARchive (NAR).  Specifically for this example we will create a Hazelcast client in both Groovy and JavaScript that will read in entries from a Map stored in Hazelcast. And to illustrate the ability to create flow files, we won't be expecting any incoming flow files, instead we'll create a new flow file and set attributes corresponding to each entry in the map.


One of the most powerful aspects of ExecuteScript is to leverage JARs that aren't already part of the scripting NAR. This means you can bring in capabilities that aren't already built-in to the various scripting languages and/or NiFi itself. To illustrate, I have a template that will use Hazelcast JARs to create a client and read in a stored "customer" Map.

To seed the map, I have a Groovy script that will store two entries into the customer map. It's available as a Gist (here), and points at my Docker container running Hazelcast.  It will put the following entries into a Hazelcast map called "customer":

'mattyb149': ['name': 'Matt Burgess', 'email': 'mattyb149@gmail.com', 'blog': 'funnifi.blogspot.com']
'nifi': ['name': 'Apache NiFi', 'email': 'nifi@apache.org', 'blog': 'nifi.apache.org']

To get the Hazelcast JARs, I downloaded and unzipped Hazelcast 3.6 (latest download here) into my Downloads folder. Then I needed to tell the ExecuteScript processors to find the appropriate JARs, so I put the following into the Module Directory for the ExecuteScript processors:

/Users/mburgess/Downloads/hazelcast-3.6/lib/hazelcast-3.6.jar,/Users/mburgess/Downloads/hazelcast-3.6/lib/hazelcast-client-3.6.jar

The Module Directory property takes a comma-separated list of files and folders pointing to external dependencies. For example, for Jython if you want to bring in extra *.py files, point at the folder that contains them. For Groovy and Javascript, you can point at a folder or files, and if a folder is designated, all JARs in that folder will be available.

The Groovy script to create a new flow file and use the Hazelcast client to read in the customer map is as follows:

import com.hazelcast.client.*
import com.hazelcast.client.config.*
import com.hazelcast.core.*

HazelcastInstance client
try {
   ClientConfig clientConfig = new ClientConfig();
   clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass");
   clientConfig.getNetworkConfig().addAddress("192.168.99.100", "192.168.99.100:32780");

   client = HazelcastClient.newHazelcastClient(clientConfig)
   flowFile = session.create()
      client.getMap("customers").each {k,v ->
      flowFile = session.putAttribute(flowFile, "hazelcast.customers.$k" as String, v as String)
   }
   session.transfer(flowFile, REL_SUCCESS)
}
catch(e) {
   log.error("Something went wrong with Hazelcast", e)
   session.transfer(flowFile, REL_FAILURE)
} finally {
   client?.shutdown()
}

For the Javascript version:

var clientConfig = new com.hazelcast.client.config.ClientConfig();
clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass");
clientConfig.getNetworkConfig().addAddress("192.168.99.100", "192.168.99.100:32780");

var client = com.hazelcast.client.HazelcastClient.newHazelcastClient(clientConfig)
flowFile = session.create()
var map = client.getMap("customers")
for each (var e in map.keySet()) {
  flowFile = session.putAttribute(flowFile, "hazelcast.customers."+e, map.get(e))
}
session.transfer(flowFile, REL_SUCCESS)

The template is available as a Gist (here), and the output in the logs looks something like:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Wed Feb 10 19:29:34 EST 2016'
Key: 'lineageStartDate'
Value: 'Wed Feb 10 19:29:34 EST 2016'
Key: 'fileSize'
Value: '0'
FlowFile Attribute Map Content
Key: 'filename'
Value: '1234292200327808'
Key: 'hazelcast.customers.mattyb149'
Value: '[name:Matt Burgess, email:mattyb149@gmail.com, blog:funnifi.blogspot.com]'
Key: 'hazelcast.customers.nifi'
Value: '[name:Apache NiFi, email:nifi@apache.org, blog:nifi.apache.org]'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'f51ce2c2-a303-4daa-934d-c2a0639c173c'
--------------------------------------------------

Hopefully this post has successfully illustrated how to create flow files and how to use the Module Directory property to bring in external dependencies.  Please let me know if you have tried this and what your results were.  As always, I welcome all comments, questions, and suggestions :)

Cheers!

ExecuteScript processor - Replacing Flow File Content

In my last post, I introduced the Apache NiFi ExecuteScript processor, including some basic features and a very simple use case that just updated a flow file attribute. However NiFi has a large number of processors that can perform a ton of processing on flow files, including updating attributes, replacing content using regular expressions, etc.  Where the ExecuteScript processor will shine is for use cases that cannot be satisfied with the current set of processors.

As an example, this blog post will present the ubiquitous Word Count example, where a text file (The Tell-Tale Heart by Edgar Allan Poe) is read in, split on non-alphanumeric characters, then each word's frequency in the corpus is calculated. The incoming flow file's contents are replaced with lines of "word: frequency" for each unique word/term in the corpus.


The previous post included a discussion on how to ensure your script will get a valid flow file (namely, returning if session.get() does not return a flow file object). It also illustrated how to use session.putAttribute() to add/update an attribute, and the importance of keeping the latest reference to the flow file object.  This post will focus on Groovy code to replace the content of an incoming flow file.

A very concise way to replace flow file content (at least in Groovy) is to leverage ProcessSession's write() method that takes a StreamCallback object. The StreamCallback will get an InputStream (from the incoming flow file) and an OutputStream (where the new content should go). The best part is that the StreamCallback interface has a single method, so with Groovy we can just use closure coercion instead of creating an explicit implementation of the interface.  Here's what such a skeleton looks like:

flowFile = session.write(flowFile, {inputStream, outputStream ->
   // Read incoming flow file content with inputStream
   // ... other stuff...
   // Write outgoing flow file content with OutputStream
} as StreamCallback)

If you need to read the entire flow file into a String (which you should avoid in case you get very large files), you can import org.apache.commons.io.IOUtils and use IOUtils.toString(InputStream, Charset). See the full example below.

My example reads the entire text in, to keep the code simple, but for a real script you might want to look at StreamTokenizer or something else to pull words out one at a time.  Once the corpus is read in, the words are split on whitespace and other non-alphanumeric characters, then turned to lowercase to get a more accurate word count (versus capitalization differences, e.g.). The word count map is then updated, then a string output is generated with inject(). This is another place where the code can be more efficient (using map.each() or something), but I was trying to keep the body of the session.write() closure concise.  The string output is written to the OutputStream, then after the write() has completed, the filename attribute is set and the file is sent to "success".

The example code for the ExecuteScript processor is as follows:

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   def wordCount = [:]

   def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
   def words = tellTaleHeart.split(/(!|\?|-|\.|\"|:|;|,|\s)+/)*.toLowerCase()

   words.each { word ->
   def currentWordCount = wordCount.get(word)
   if(!currentWordCount) {
          wordCount.put(word, 1)
   }
   else {
          wordCount.put(word, currentWordCount + 1)
   }
   }

   def outputMapString = wordCount.inject("", {k,v -> k += "${v.key}: ${v.value}\n"})
  
   outputStream.write(outputMapString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

flowFile = session.putAttribute(flowFile, 'filename', 'telltale_heart_wordcount')
session.transfer(flowFile, REL_SUCCESS)

The self-contained template is a Gist (here), it includes the full text and a PutFile to write out the word count file in a directory relative to the NiFi instance.

ExecuteScript processor - Hello World!

In Apache NiFi 0.5.0, a few new processors were added, two of which allow the user to write scripts to do custom processing.  This post talks about one of them: ExecuteScript.


The ExecuteScript processor is intended as a scriptable "onTrigger" method, basically meaning when the processor is scheduled to run, your script will be executed. As of 0.5.0, the available script engines are ECMAScript (Javascript), Jython, JRuby, Groovy, and Lua). For this blog, almost all examples will be in Groovy, but templates exist for other languages as well.

To allow for the most flexibility, only a handful of objects are passed into the script as variables:

session: This is a reference to the ProcessSession assigned to the processor. The session allows you to perform operations on flow files such as create(), putAttribute(), and transfer(). We'll get to an example below.

context: This is a reference to the ProcessContext for the processor. It can be used to retrieve processor properties, relationships, and the StateManager (see NiFi docs for the uses of StateManager, also new in 0.5.0)

log: This is a reference to the ProcessorLog for the processor. Use it to log messages to NiFi, such as log.info('Hello world!')

REL_SUCCESS: This is a reference to the "success" relationship defined for the processor. It could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.

REL_FAILURE: This is a reference to the "failure" relationship defined for the processor. As with REL_SUCCESS, it could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.

Dynamic Properties:  Any dynamic properties defined in ExecuteScript are passed to the script engine as variables set to the string value of the property values. This means you must be aware of the variable naming properties for the chosen script engine. For example, Groovy does not allow periods (.) in variable names, so don't use something like "my.property" as a dynamic property name.

Usage:

The script is not required to work with the session, context, flow files, or anything else. In fact a one-line Groovy script to simply log that you're being run is:

log.info("Hello from Groovy!")

However such scripts are probably not that interesting :) Most scripts will want to interact with the session and flow files in some way, either by adding attributes, replacing content, or even creating new flow files.

You may have noticed that any incoming flow file is not passed into the script. This is because ExecuteScript can be used without any input, usually to generate flow files to pass into the remainder of the flow. To allow both cases, the ProcessSession is supplied and the script is responsible for handling any flow files. This can result in some boilerplate code, but the trade-off for flexibility and power is well worth it.

If your script only wants to handle incoming flow files, then you can simply return if the session has no flow file available for processing. In Groovy:

def flowFile = session.get()
if (!flowFile) return
// Remainder of script

If you are acting on a flow file, there are two major things to remember:

1) Keep track of the latest version of the flow file reference. This means if you act on a flow file, such as adding an attribute, you should replace the old reference with the one returned by the session method. For example:

flowFile = session.putAttribute(flowFile, 'my-property', 'my-value')

2) The script must transfer the flow file(s) that are retrieved and/or created. Unless an error condition occurred, transfer like so:

session.transfer(flowFile, REL_SUCCESS)

If an error has occurred (i.e. your script has caught an exception), you can route the flow file to failure:

session.transfer(flowFile, REL_FAILURE)

Putting this all together, here is an example script that updates the "filename" attribute (a core flow file attribute that exists on every flow file):

def flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'filename', 'myfile')
session.transfer(flowFile, REL_SUCCESS)

I have created a standalone NiFi template (ExecuteScriptHelloWorldGroovy) that will generate a JSON file, then call the above script to update the filename attribute, then log that attribute:


That's all for this introduction to ExecuteScript, check the NiFi docs for more information about configuring the ExecuteScript processor, and stay tuned for more blog posts about ExecuteScript and other cool NiFi things :)

Cheers!