Wednesday, February 10, 2016

ExecuteScript processor - Replacing Flow File Content

In my last post, I introduced the Apache NiFi ExecuteScript processor, including some basic features and a very simple use case that just updated a flow file attribute. However NiFi has a large number of processors that can perform a ton of processing on flow files, including updating attributes, replacing content using regular expressions, etc.  Where the ExecuteScript processor will shine is for use cases that cannot be satisfied with the current set of processors.

As an example, this blog post will present the ubiquitous Word Count example, where a text file (The Tell-Tale Heart by Edgar Allan Poe) is read in, split on non-alphanumeric characters, then each word's frequency in the corpus is calculated. The incoming flow file's contents are replaced with lines of "word: frequency" for each unique word/term in the corpus.

The previous post included a discussion on how to ensure your script will get a valid flow file (namely, returning if session.get() does not return a flow file object). It also illustrated how to use session.putAttribute() to add/update an attribute, and the importance of keeping the latest reference to the flow file object.  This post will focus on Groovy code to replace the content of an incoming flow file.

A very concise way to replace flow file content (at least in Groovy) is to leverage ProcessSession's write() method that takes a StreamCallback object. The StreamCallback will get an InputStream (from the incoming flow file) and an OutputStream (where the new content should go). The best part is that the StreamCallback interface has a single method, so with Groovy we can just use closure coercion instead of creating an explicit implementation of the interface.  Here's what such a skeleton looks like:

flowFile = session.write(flowFile, {inputStream, outputStream ->
   // Read incoming flow file content with inputStream
   // ... other stuff...
   // Write outgoing flow file content with OutputStream
} as StreamCallback)

If you need to read the entire flow file into a String (which you should avoid in case you get very large files), you can import and use IOUtils.toString(InputStream, Charset). See the full example below.

My example reads the entire text in, to keep the code simple, but for a real script you might want to look at StreamTokenizer or something else to pull words out one at a time.  Once the corpus is read in, the words are split on whitespace and other non-alphanumeric characters, then turned to lowercase to get a more accurate word count (versus capitalization differences, e.g.). The word count map is then updated, then a string output is generated with inject(). This is another place where the code can be more efficient (using map.each() or something), but I was trying to keep the body of the session.write() closure concise.  The string output is written to the OutputStream, then after the write() has completed, the filename attribute is set and the file is sent to "success".

The example code for the ExecuteScript processor is as follows:

import java.nio.charset.*

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   def wordCount = [:]

   def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
   def words = tellTaleHeart.split(/(!|\?|-|\.|\"|:|;|,|\s)+/)*.toLowerCase()

   words.each { word ->
   def currentWordCount = wordCount.get(word)
   if(!currentWordCount) {
          wordCount.put(word, 1)
   else {
          wordCount.put(word, currentWordCount + 1)

   def outputMapString = wordCount.inject("", {k,v -> k += "${v.key}: ${v.value}\n"})
} as StreamCallback)

flowFile = session.putAttribute(flowFile, 'filename', 'telltale_heart_wordcount')
session.transfer(flowFile, REL_SUCCESS)

The self-contained template is a Gist (here), it includes the full text and a PutFile to write out the word count file in a directory relative to the NiFi instance.