Tuesday, February 23, 2016

ExecuteScript Explained - Split fields and NiFi API with Groovy

There was a question on Twitter about being able to split fields in a flow file based on a delimiter, and selecting the desired columns. There are a few ways to do this in NiFi, but I thought I'd illustrate how to do it using the ExecuteScript processor (new in NiFi 0.5.0).

The approach from the NiFi side is very similar to my previous post on replacing flow file content:
def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   // Read incoming flow file content with inputStream
   // ... other stuff...
   // Write outgoing flow file content with OutputStream
} as StreamCallback)
Before we get to the "split text on delimiter" part, I'll explain a little bit more about what's going on above in terms of the NiFi API and Groovy.

The script is evaluated when the ExecuteScript processor is triggered. The conditions under which the processor may be triggered are listed in the Developer's Guide here.  The "session" object is a ProcessSession instance, and the get() call will return a flow file if it is available. It is possible that get() will return null. This can happen if there are no incoming connections and the processor has been scheduled to run. Also it can happen if there were a flowfile available in the queue but another task has already claimed it after this task has been triggered.  In our example, we only want to work on existing flowfiles, so we perform a session.get() and return from the script if no flow file was available.

The meat of the script is the session.write() call. It is based on one of the ProcessSession API's write methods:
FlowFile write(FlowFile source, StreamCallback writer) 
We use this to pass the incoming flow file in, along with a callback to overwrite the content. It returns a FlowFile because FlowFile objects are immutable, so the reference returned by write() is a reference to the latest version of that flowfile (with the content updated). The latest reference to a modified flowfile is the one that is expected to be transferred. You can see this in the skeleton above:
flowFile = session.write(flowFile, ...
The framework will invoke a method on the StreamCallback, passing in an InputStream (associated with the incoming flowfile's content), and an OutputStream (where you write the new content). This is StreamCallback's single method:
void process(InputStream in, OutputStream out) throws IOException
This is where Groovy's closure coercion feature really shines. Instead of implementing a class (or anonymous class) and declaring that method:
def callback = new StreamCallback() {
  void process(InputStream in, OutputStream out) throws IOException {
     doStuff()
  }
}
If the interface has a single method, you can create a closure and use the "as" keyword to coerce it into the single method of the interface, like so:
def callback = { doStuff() } as StreamCallback
In the skeleton above, I didn't create a variable to hold the callback, I just passed it into the write() method.  As far as boilerplate code goes, the above skeleton is not too bad :)

The example I'll use (based on the Tweet I saw) has a flowfile with the following contents:
a1|b1|c1|d1
a2|b2|c2|d2
a3|b3|c3|d3
The desired output is the middle two columns, delimited by a space instead of the bar:
b1 c1
b2 c2
b3 c3
Here's the script I ended up with:
import java.nio.charset.StandardCharsets

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   inputStream.eachLine { line ->
   a = line.tokenize('|')
   outputStream.write("${a[1]} ${a[2]}\n".toString().getBytes(StandardCharsets.UTF_8))
   }
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)
Notice there's no error handling, stay tuned for most posts :)  Here's a screenshot for a test flow that puts the sample data in and writes the expected output to a file:



The test flow template is available as a Gist (here). I hope this was helpful, as always I welcome all comments, questions, and suggestions.

Cheers!

6 comments:

  1. Hi Matt,
    how can I add condition, e.x. print add 00 to $1 if it starts with 0 ?

    ReplyDelete
  2. Hi, not an expert, but after the line a = line.tokenize('|')
    I presume you can create an IF statement and check the value and add 00 or other if the value complies.
    Guru java developers might know a faster way inside the outputstream.write statement. I think it is elegant to give it a separate line and explanation in your code for maintainability.

    ReplyDelete
  3. Hi Matt,
    I tried template to import in nifi 1.2.0 but i got error that "Specified template is not in valid format", please could you help me where i am wrong.

    ReplyDelete
    Replies
    1. That template is pretty old, and there were many changes between when it was written and 1.2.0, so it's probably too old to be loaded into 1.2.0, sorry! You really only need the script section though, you can rebuild that flow from scratch using the same processors, and just paste the script into the ExecuteScript body.

      Delete
  4. Thanks for the script, its very good, however I noticed it doesnt work with UTF8 characters in the data file, once we write the flowfile it doesnt encode correctly using arabic texts, etc. any ideas?

    ReplyDelete
  5. not able to read/refer attribute inside streamcallbac .. i wanted to compare against content in json.. its groovy script i am talking abt.. any suggestions?

    ReplyDelete