Tuesday, March 14, 2017

NiFi ExecuteScript Cookbook

Hello All!  Just wanted to write a quick post here to let you know about a series of articles I have written about ExecuteScript support for Apache NiFi, with discussions of how to do various tasks, and examples in various supported scripting languages. I posted them on Hortonworks Community Connection (HCC).  Full disclosure: I am a Hortonworks employee :)

Lua and ExecuteScript in NiFi (revisited)

I recently fielded a question about using Lua (actually, LuaJ) in NiFi's ExecuteScript processor to manipulate flow files. I had written a basic article on using LuaJ with ExecuteScript, but that example only shows how to create new flow files, it does not address accepting incoming flow files or manipulating the data.

To rectify that I answered the question with the following example script:

flowFile = session:get()
if flowFile == nil then

local writecb =
luajava.createProxy("org.apache.nifi.processor.io.StreamCallback", {
    process = function(inputStream, outputStream)
      local isr = luajava.newInstance('java.io.InputStreamReader', inputStream)
      local br = luajava.newInstance('java.io.BufferedReader', isr)
      local line = br:readLine()
      while line ~= nil do
         -- Do stuff to each line here
         line = br:readLine()
         if line ~= nil then
flowFile = session:putAttribute(flowFile, "lua.attrib", "my attribute value")
flowFile = session:write(flowFile, writecb)
session:transfer(flowFile, REL_SUCCESS)

Readers of my last LuaJ post will recognize the approach, using luajava.createProxy() to basically create an anonymous class instance of a NiFi Callback class, then providing (aka "overriding") the requisite interface method (in this case, the "process" method).

The first difference here is that I'm using the StreamCallback class instead of the OutputStreamCallback class in my previous example. You may recall that OutputStreamCallback only allows you to write to a flow file, whereas StreamCallback is for overwriting existing flow file content, by making both the input stream of the current version of the flow file, as well as the output stream for the next version of the flow file, available in the process() method.

You may also recall that often my scripting examples use Apache Commons' IOUtils class to read the
entire flow file content in as a string, then manipulate it after the fact. However LuaJ has a bug where it only uses the system classloader and thus won't have access to the additional classes provided to the scripting NAR.  So for this example I am wrapping the incoming InputStream in an InputStreamReader, then a BufferedReader so I can proceed line-by-line.  I reverse each line, and if there are lines remaining, I add the newline back to the output stream.

If you are simply reading in the content of a flow file, and won't be overwriting that flow file content, you can use InputStreamCallback instead of StreamCallback. InputStreamCallback's process() method gives you only the input stream of the incoming flow file. Some people use a session.read() with an InputStreamCallback to handle the incoming flow file(s), then later use a session.write() with an OutputStreamCallback, rather than a single session.write() with a StreamCallback as is shown above. The common use case for the alternate approach is for when you send the original flow file to an "original" relationship, but also write out new flow files based on the content of the incoming one(s). Many "Split" processors do this.

Anyway, I hope this example is informative and shows how to use Lua scripting in NiFi to perform custom logic.  As always, I welcome all comments, questions, and suggestions.  Cheers!

Friday, January 6, 2017

Inspecting the NAR classloading hierarchy

I've noticed on the NiFi mailing lists and in various places that users sometimes attempt to modify their NiFi installations by adding JARs to the lib/ folder, adding various custom and/or external NARs that don't come with the NiFi distribution, etc.  This can sometimes lead to issues with classloading, which is often difficult for a user to debug. If the same changes are not made across a NiFi cluster, more trouble can ensue.

For this reason, it might be helpful to understand the way NARs are loaded in NiFi. When a NAR is loaded by NiFi, a NarClassLoader is created for it. A NarClassLoader is an URLClassLoader that contains all the JAR dependencies needed by that NAR, such as third-party libraries, NiFi utilities, etc.  If the NAR definition includes a parent NAR, then the NarClassLoader's parent is the NarClassLoader for the parent NAR.  This allows all NARs with the same parent to have access to the same classes, which alleviates certain classloader issues when talking between NARs / utilities. One pervasive example is the specification of an "API NAR" such as "nifi-standard-services-api-nar", which enables the child NARs to use the same API classes/interfaces.

All NARs (and all child ClassLoaders in Java) have the following class loaders in their parent chain (listed from top to bottom):
  1. Bootstrap class loader
  2. Extensions class loader
  3. System class loader

You can consult the Wiki page for Java ClassLoader for more information on these class loaders, but in the NiFi context just know that the System class loader (aka Application ClassLoader) includes all the JARs from the lib/ folder (but not the lib/bootstrap folder) under the NiFi distribution directory.

To help in debugging classloader issues, either on a standalone node or a cluster, I wrote a simple flow using ExecuteScript with Groovy to send out a flow file per NAR, whose contents include the classloader chain (including which JARs belong to which URLClassLoader) in the form:

The classloaders are listed from top to bottom, so the first will always be the extensions classloader, followed by the system classloader, etc.  The NarClassLoader for the given NAR will be at the bottom.

The script is as follows:

import java.net.URLClassLoader
import org.apache.nifi.nar.NarClassLoaders

NarClassLoaders.instance.extensionClassLoaders.each { c ->

def chain = []
while(c) {
  chain << c
  c = c.parent

def flowFile = session.create()
flowFile = session.write(flowFile, {outputStream ->
  chain.reverseEach { cl ->
    if(cl instanceof URLClassLoader) {
      cl.getURLs().each {
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)

The script iterates over all the "Extension Class Loaders" (aka the classloader for each NAR), builds a chain of classloaders starting with the child and adding all the parents, then iterates the list in reverse, printing the classloader object name followed by a tab-indented list of any URLs (JARs, e.g.) included in the classloader.

This can be used in a NiFi flow, perhaps using LogAttribute or PutFile to display the results of each NAR's classloader hierarchy.

Note that these are the classloaders that correspond to a NAR, not the classloaders that belong to instances of processors packaged in the NAR.  For runtime information about the classloader chain associated with a processor instance, I will tackle that in another blog post :)

Please let me know if you find this useful, As always suggestions, questions, and improvements are welcome.  Cheers!