Tuesday, March 14, 2017

NiFi ExecuteScript Cookbook

Hello All!  Just wanted to write a quick post here to let you know about a series of articles I have written about ExecuteScript support for Apache NiFi, with discussions of how to do various tasks, and examples in various supported scripting languages. I posted them on Hortonworks Community Connection (HCC).  Full disclosure: I am a Hortonworks employee :)


Lua and ExecuteScript in NiFi (revisited)

I recently fielded a question about using Lua (actually, LuaJ) in NiFi's ExecuteScript processor to manipulate flow files. I had written a basic article on using LuaJ with ExecuteScript, but that example only shows how to create new flow files, it does not address accepting incoming flow files or manipulating the data.

To rectify that I answered the question with the following example script:

flowFile = session:get()
if flowFile == nil then
  return
end

local writecb =
luajava.createProxy("org.apache.nifi.processor.io.StreamCallback", {
    process = function(inputStream, outputStream)
      local isr = luajava.newInstance('java.io.InputStreamReader', inputStream)
      local br = luajava.newInstance('java.io.BufferedReader', isr)
      local line = br:readLine()
      while line ~= nil do
         -- Do stuff to each line here
         outputStream:write(line:reverse())
         line = br:readLine()
         if line ~= nil then
           outputStream:write('\n')
         end
      end
    end
})
flowFile = session:putAttribute(flowFile, "lua.attrib", "my attribute value")
flowFile = session:write(flowFile, writecb)
session:transfer(flowFile, REL_SUCCESS)

Readers of my last LuaJ post will recognize the approach, using luajava.createProxy() to basically create an anonymous class instance of a NiFi Callback class, then providing (aka "overriding") the requisite interface method (in this case, the "process" method).

The first difference here is that I'm using the StreamCallback class instead of the OutputStreamCallback class in my previous example. You may recall that OutputStreamCallback only allows you to write to a flow file, whereas StreamCallback is for overwriting existing flow file content, by making both the input stream of the current version of the flow file, as well as the output stream for the next version of the flow file, available in the process() method.

You may also recall that often my scripting examples use Apache Commons' IOUtils class to read the
entire flow file content in as a string, then manipulate it after the fact. However LuaJ has a bug where it only uses the system classloader and thus won't have access to the additional classes provided to the scripting NAR.  So for this example I am wrapping the incoming InputStream in an InputStreamReader, then a BufferedReader so I can proceed line-by-line.  I reverse each line, and if there are lines remaining, I add the newline back to the output stream.

If you are simply reading in the content of a flow file, and won't be overwriting that flow file content, you can use InputStreamCallback instead of StreamCallback. InputStreamCallback's process() method gives you only the input stream of the incoming flow file. Some people use a session.read() with an InputStreamCallback to handle the incoming flow file(s), then later use a session.write() with an OutputStreamCallback, rather than a single session.write() with a StreamCallback as is shown above. The common use case for the alternate approach is for when you send the original flow file to an "original" relationship, but also write out new flow files based on the content of the incoming one(s). Many "Split" processors do this.

Anyway, I hope this example is informative and shows how to use Lua scripting in NiFi to perform custom logic.  As always, I welcome all comments, questions, and suggestions.  Cheers!

Friday, January 6, 2017

Inspecting the NAR classloading hierarchy

I've noticed on the NiFi mailing lists and in various places that users sometimes attempt to modify their NiFi installations by adding JARs to the lib/ folder, adding various custom and/or external NARs that don't come with the NiFi distribution, etc.  This can sometimes lead to issues with classloading, which is often difficult for a user to debug. If the same changes are not made across a NiFi cluster, more trouble can ensue.

For this reason, it might be helpful to understand the way NARs are loaded in NiFi. When a NAR is loaded by NiFi, a NarClassLoader is created for it. A NarClassLoader is an URLClassLoader that contains all the JAR dependencies needed by that NAR, such as third-party libraries, NiFi utilities, etc.  If the NAR definition includes a parent NAR, then the NarClassLoader's parent is the NarClassLoader for the parent NAR.  This allows all NARs with the same parent to have access to the same classes, which alleviates certain classloader issues when talking between NARs / utilities. One pervasive example is the specification of an "API NAR" such as "nifi-standard-services-api-nar", which enables the child NARs to use the same API classes/interfaces.

All NARs (and all child ClassLoaders in Java) have the following class loaders in their parent chain (listed from top to bottom):
  1. Bootstrap class loader
  2. Extensions class loader
  3. System class loader

You can consult the Wiki page for Java ClassLoader for more information on these class loaders, but in the NiFi context just know that the System class loader (aka Application ClassLoader) includes all the JARs from the lib/ folder (but not the lib/bootstrap folder) under the NiFi distribution directory.

To help in debugging classloader issues, either on a standalone node or a cluster, I wrote a simple flow using ExecuteScript with Groovy to send out a flow file per NAR, whose contents include the classloader chain (including which JARs belong to which URLClassLoader) in the form:
<classloader_object>
     <path_to_jar_file>
     <path_to_jar_file>
     <path_to_jar_file>
     ...
<classloader_object>
     <path_to_jar_file>
     <path_to_jar_file>
     <path_to_jar_file>
     ...

The classloaders are listed from top to bottom, so the first will always be the extensions classloader, followed by the system classloader, etc.  The NarClassLoader for the given NAR will be at the bottom.

The script is as follows:

import java.net.URLClassLoader
import org.apache.nifi.nar.NarClassLoaders

NarClassLoaders.instance.extensionClassLoaders.each { c ->

def chain = []
while(c) {
  chain << c
  c = c.parent
}

def flowFile = session.create()
flowFile = session.write(flowFile, {outputStream ->
  chain.reverseEach { cl ->
    outputStream.write("${cl.toString()}\n".bytes)
    if(cl instanceof URLClassLoader) {
      cl.getURLs().each {
        outputStream.write("\t${it.toString()}\n".bytes)
      }
    }
  }
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}

The script iterates over all the "Extension Class Loaders" (aka the classloader for each NAR), builds a chain of classloaders starting with the child and adding all the parents, then iterates the list in reverse, printing the classloader object name followed by a tab-indented list of any URLs (JARs, e.g.) included in the classloader.

This can be used in a NiFi flow, perhaps using LogAttribute or PutFile to display the results of each NAR's classloader hierarchy.

Note that these are the classloaders that correspond to a NAR, not the classloaders that belong to instances of processors packaged in the NAR.  For runtime information about the classloader chain associated with a processor instance, I will tackle that in another blog post :)

Please let me know if you find this useful, As always suggestions, questions, and improvements are welcome.  Cheers!

Wednesday, August 10, 2016

Executing Remote Commands in NiFi with ExecuteScript, Groovy, and Sshoogr

There are already some processors in Apache NiFi for executing commands, such as ExecuteProcess and ExecuteStreamCommand. These allow execution of remote scripts by calling the operating system's "ssh" command with various parameters (such as what remote command(s) to execute when the SSH session is established). Often this is accomplished by writing a bash script with a number of ssh commands (or a single ssh command with a long list of remote commands at the end).

As a different approach, you can use ExecuteScript or InvokeScriptedProcessor, along with a third-party library for executing remote commands with SSH. For this post, I will be using Groovy as the scripting language and Sshoogr, which is a very handy SSH DSL for Groovy.

Sshoogr can be installed via SDK Manager or downloaded (from Maven Central for example). The key is to have all the libraries (Sshoogr itself and its dependencies) in a directory somewhere. Then you point to the directory in ExecuteScript's Module Directory property, and those JARs will be available to the Groovy script.

In order to make this example fairly flexible, I chose a few conventions. First, the commands to be executed are expected to be on individual lines of an incoming flow file. Next, the configuration parameters (such as hostname, username, password, and port) are added to ExecuteScript as dynamic properties. Note that the password property is not sensitive in this case; if you want that, use InvokeScriptedProcessor and explicitly add the properties (specifying password as sensitive).

For Sshoogr to work (at least for this example), it is expected that the RSA key for the remote node is in the NiFi user's ~/.ssh/known_hosts file. Because of OS and other differences, sometimes the SSH connection will fail due to strict host key checking, so in the script we will disable that in Sshoogr.

Now to the script.  We start with the standard code for requiring an input flow file, and then use session.read() to get each line into a list called commands:
def flowFile = session.get()
if(!flowFile) return

def commands = []
// Read in one command per line
session.read(flowFile, {inputStream ->
    inputStream.eachLine { line -> commands << line  }
} as InputStreamCallback)
Then we disable strict host key checking:
options.trustUnknownHosts = true
Then we use the Sshoogr DSL to execute the commands from the flow file:
def result = null
remoteSession {

   host = sshHostname.value
   username = sshUsername.value
   password = sshPassword.value
   port = Integer.parseInt(sshPort.value)
        
   result = exec(showOutput: true, failOnError: false, command: commands.join('\n'))
}
Here we are using the aforementioned ExecuteScript dynamic properties (sshHostname, sshUsername, sshPassword, sshPort). Later in the post, a screenshot below shows them after having been added to the ExecuteScript configuration.

The exec() method is the powerhouse of Sshoogr. In this case we want the output of the commands (to put in an outgoing flowfile). Also I've set "failOnError" to false just so all commands will be attempted, but it is likely that you will want this to be true such that if a command fails, none of the following commands will be attempted. The final parameter in this example is "command". If you don't use the "named-parameter" (aka Map) version of exec() and just want to execute the list of commands (without setting "showOutput" for example), then exec() will take a Collection:
exec(commands)
In our case though, the exec() method will turn the "command" parameter into a string, so instead of just passing in the List "commands", we need to turn it back into a line-by-line string, so we use join('\n').

The next part of the script is to overwrite the incoming flow file, or if we could've created a new one with the original as a parent. If using ExecuteScript and creating a new flow file, be sure to remove the original. If using InvokeScriptedProcessor, you could define an "original" relationship and route the incoming flow file to that.
flowFile = session.write(flowFile, { outputStream ->
    result?.output?.eachLine { line ->
       outputStream.write(line.bytes)
       outputStream.write('\n'.bytes)
    }
} as OutputStreamCallback)
Finally, using result.exitStatus, we determine whether to route the flow file to success or failure:
session.transfer(flowFile, result?.exitStatus ? REL_FAILURE : REL_SUCCESS)
The full script for this example is as follows:
import static com.aestasit.infrastructure.ssh.DefaultSsh.*

def flowFile = session.get()
if(!flowFile) return

def commands = []
// Read in one command per line
session.read(flowFile, {inputStream ->
    inputStream.eachLine { line -> commands << line  }
} as InputStreamCallback)

options.trustUnknownHosts = true
def result = null
remoteSession {

   host = sshHostname.value
   username = sshUsername.value
   password = sshPassword.value
   port = Integer.parseInt(sshPort.value)
        
   result = exec(showOutput: true, failOnError: false, command: commands.join('\n'))
}

flowFile = session.write(flowFile, { outputStream ->
    result?.output?.eachLine { line ->
       outputStream.write(line.bytes)
       outputStream.write('\n'.bytes)
    }
} as OutputStreamCallback)

session.transfer(flowFile, result?.exitStatus ? REL_FAILURE : REL_SUCCESS)
Moving now to the ExecuteScript configuration:

Here I am connecting to my Hortonworks Data Platform (HDP) Sandbox to run Hadoop commands and such, the default settings are shown and you can see I've pasted the script into Script Body, and my Module Directory is pointing at my Sshoogr download folder. I added this to a simple flow that generates a flow file, fills it with commands, runs them remotely, then logs the output returned from the session:

For this example I ran the following two commands:
hadoop fs -ls /user
echo "Hello World!" > here_i_am.txt
In the log (after LogAttribute runs with payload included), I see the following:

And on the sandbox I see the file has been created with the expected content:

As you can see, this post described how to use ExecuteScript with Groovy and Sshoogr to execute remote commands coming from flow files. I created a Gist of the template, but it was created with an beta version of Apache NiFi 1.0 so it may not load into earlier versions (0.x). However all the ingredients are in this post so it should be easy to create your own flow, please let me know how/if it works for you :)

Cheers!

Thursday, June 9, 2016

Testing ExecuteScript processor scripts

I've been getting lots of questions about how to develop/debug scripts that go into the ExecuteScript processor in NiFi. One way to do this is to add a unit test to the nifi-scripting-processors submodule, and set the Script File property to your test script. However for this you basically need the full NiFi source.

To make things easier, I basically took a pared-down copy of the ExecuteScript processor (and its helper classes), added nifi-mock as a dependency, and slapped a command-line interface on it. This way with a single JAR you can run your script inside a dummy flow containing ExecuteScript.

The result is version 1.1.1 of the NiFi Script Tester utility, on GitHub and Bintray.

Basically it runs your script as a little unit test, and you can pipe stdin to become a flowfile, or point it at a directory and it will send every file as a flowfile, stuff like that. If your script doesn't need a flowfile, you can run it without specifying an input dir or piping in stdin, it will run once even without input.  The usage is as follows:
Usage: java -jar nifi-script-tester-<version>-all.jar [options] <script file>
 Where options may include:
   -success            Output information about flow files that were transferred to the success relationship. Defaults to true
   -failure            Output information about flow files that were transferred to the failure relationship. Defaults to false
   -no-success         Do not output information about flow files that were transferred to the success relationship. Defaults to false
   -content            Output flow file contents. Defaults to false
   -attrs              Output flow file attributes. Defaults to false
   -all-rels           Output information about flow files that were transferred to any relationship. Defaults to false
   -all                Output content, attributes, etc. about flow files that were transferred to any relationship. Defaults to false
   -input=<directory>  Send each file in the specified directory as a flow file to the script
   -modules=<paths>    Comma-separated list of paths (files or directories) containing script modules/JARs

As a basic example, let's say I am in the nifi-script-tester Git repo, and I've built using "gradle shadowJar" so my JAR is in build/libs. I can run one of the basic unit tests like so:
java -jar build/libs/nifi-script-tester-1.1.1-all.jar src/test/resources/test_basic.js
Which gives the following output:
2016-06-09 14:20:49,787 INFO  [pool-1-thread-1]                nifi.script.ExecuteScript - ExecuteScript[id=9a507773-86ae-4c33-957f-1c0270302a0e] hello
Flow Files transferred to success: 0
This shows minimal output, just the logging from the framework and the default summary statistic of "Flow Files transferred to success".  Instead let's try a more comprehensive example, where I pipe in a JSON file, run my Groovy script from the JSON-to-JSON conversion post, and display all the attributes and contents and statistics from the run:
cat src/test/resources/input_files/jolt.json | java -jar build/libs/nifi-script-tester-1.1.1-all.jar -all src/test/resources/test_json2json.groovy
This gives the following (much more verbose) output:
Flow file FlowFile[0,14636536169108_translated.json,283B]
---------------------------------------------------------
FlowFile Attributes
Key: 'entryDate'
	Value: 'Thu Jun 09 14:24:50 EDT 2016'
Key: 'lineageStartDate'
	Value: 'Thu Jun 09 14:24:50 EDT 2016'
Key: 'fileSize'
	Value: '283'
FlowFile Attribute Map Content
Key: 'path'
	Value: 'target'
Key: 'filename'
	Value: '14636536169108_translated.json'
Key: 'uuid'
	Value: '84d8d290-fbf9-4d57-aaf7-fd050da40d9f'
---------------------------------------------------------
{
    "Range": 5,
    "Rating": "3",
    "SecondaryRatings": {
        "metric": {
            "Id": "metric",
            "Range": 5,
            "Value": 6
        },
        "quality": {
            "Id": "quality",
            "Range": 5,
            "Value": 3
        }
    }
}

Flow Files transferred to success: 1

Flow Files transferred to failure: 0
There are options to suppress or select various things such as relationships, attributes, flowfile contents, etc.  As a final example let's look at the Hazelcast example from my previous post, to see how to add paths (files and directories) to the script tester, in the same way you'd set the Module Path property in the ExecuteScript processor:

java -jar ~/git/nifi-script-tester/build/libs/nifi-script-tester-1.1.1-all.jar -attrs -modules=/Users/mburgess/Downloads/hazelcast-3.6/lib hazelcast.groovy
And the output:

Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is STARTING
Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is STARTED
Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is CLIENT_CONNECTED
Jun 09, 2016 2:31:01 PM com.hazelcast.client.spi.impl.ClientMembershipListener
INFO:

Members [1] {
	Member [172.17.0.2]:5701
}

Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is SHUTTING_DOWN
Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is SHUTDOWN
Flow file FlowFile[0,15008187914245.mockFlowFile,0B]
---------------------------------------------------------
FlowFile Attributes
Key: 'entryDate'
	Value: 'Thu Jun 09 14:31:01 EDT 2016'
Key: 'lineageStartDate'
	Value: 'Thu Jun 09 14:31:01 EDT 2016'
Key: 'fileSize'
	Value: '0'
FlowFile Attribute Map Content
Key: 'path'
	Value: 'target'
Key: 'hazelcast.customers.nifi'
	Value: '[name:Apache NiFi, email:nifi@apache.org, blog:nifi.apache.org]'
Key: 'filename'
	Value: '15008187914245.mockFlowFile'
Key: 'hazelcast.customers.mattyb149'
	Value: '[name:Matt Burgess, email:mattyb149@gmail.com, blog:funnifi.blogspot.com]'
Key: 'uuid'
	Value: '0a7c788e-0aef-40e4-b9a6-426f877dbfbe'
---------------------------------------------------------

Flow Files transferred to success: 1
This script would not work without the Hazelcast JARs, so this shows how the "-modules" option is used to add them to the classpath for testing.

The nifi-script-tester only supports Javascript and Groovy at the moment; including Jython (for example) would increase the JAR's size by 500% :/ Right now its only 8.6 MB, so a little big but not too bad.

Anyway hope this helps, please let me know how/if it works for you!

Cheers,
Matt

Wednesday, May 25, 2016

Validating JSON in NiFi with ExecuteScript

My last post alluded to a Groovy script for ExecuteScript that would use JSON Schema Validator to validate incoming flow files (in JSON format) against a JSON Schema. The purpose of that post was to show how to use Groovy Grape to get the JSON Schema Validator dependencies loaded dynamically by the script (versus downloading the JARs and adding them to the Module Directory property).  For this post I want to show the actual JSON validation script (as I presented it on the Apache NiFi "users" mailing list).

I'll use the schema as it was presented to me on the mailing list:
{
  "type": "object",
  "required": ["name", "tags", "timestamp", "fields"],
  "properties": {
    "name": {"type": "string"},
    "timestamp": {"type": "integer"},
    "tags": {"type": "object", "items": {"type": "string"}},
    "fields": { "type": "object"}
  }
}
This shows that the incoming flow file should contain a JSON object, that it needs to have certain fields (the "required" values), and the types of the values it may/must contain (the "properties" entries).  For this script I'll hard-code this schema, but I'll talk a bit at the end about how this can be done dynamically for a better user experience.

Since the schema itself is JSON, we use org.json.JSONObject and such to read in the schema. Then we use org.everit.json.schema.SchemaLoader to load in a Schema. We can read in the flow file with session.read, passing in a closure cast to InputStreamCallback, see my previous post for details, and call schema.validate(). If the JSON is not valid, validate() will throw a ValidationExecption. If it does, I set a "valid" variable to false, then route to SUCCESS or FAILURE depending on whether the incoming flow file was validated against the schema.  The original script is as follows:
import org.everit.json.schema.Schema
import org.everit.json.schema.loader.SchemaLoader
import org.json.JSONObject
import org.json.JSONTokener

flowFile = session.get()
if(!flowFile) return

jsonSchema = """
{
  "type": "object",
  "required": ["name", "tags", "timestamp", "fields"],
  "properties": {
    "name": {"type": "string"},
    "timestamp": {"type": "integer"},
    "tags": {"type": "object", "items": {"type": "string"}},
    "fields": { "type": "object"}
  }
}
"""

boolean valid = true
session.read(flowFile, { inputStream ->
   jsonInput = org.apache.commons.io.IOUtils.toString(inputStream,
java.nio.charset.StandardCharsets.UTF_8)
   JSONObject rawSchema = new JSONObject(new JSONTokener(new
ByteArrayInputStream(jsonSchema.bytes)))
   Schema schema = SchemaLoader.load(rawSchema)
   try {
      schema.validate(new JSONObject(jsonInput))
    } catch(ve) {
      log.error("Doesn't adhere to schema", ve)
      valid = false
    }
  } as InputStreamCallback)

session.transfer(flowFile, valid ? REL_SUCCESS : REL_FAILURE)

This is a pretty basic script, there are things we could do to improve the capability:
  • Move the schema load out of the session.read() method, since it doesn't require the input
  • Allow the user to specify the schema via a dynamic property
  • Do better exception handling and error message reporting
A worthwhile improvement (that would include all of these) is to turn the script into a proper Processor and put it in an InvokeScriptedProcessor. That way you could have a custom set of relationships, properties, to make it easy for the user to configure and use.

Of course, the best solution is probably to implement it in Java and contribute it to Apache NiFi under the Jira case NIFI-1893 :)

Cheers!

Tuesday, May 24, 2016

Using Groovy @Grab with ExecuteScript

As we've seen in a previous post, it is possible to add dependencies/modules to your ExecuteScript classpath by using the Module Directory property in the processor configuration dialog.  However many third-party libraries have (sometimes lots of) transitive dependencies, and downloading all of them and/or setting up your Module Directory can become a pain.

The Groovy world has a solution for such a thing, using Groovy Grape and specifically the @Grab annotation. This instructs Groovy to download the artifact(s) and their dependencies into the Grape cache. It works much like Maven or Ivy, and in fact it is based on Ivy. For this reason, ExecuteScript (when using the Groovy engine) needs the Apache Ivy JAR.

You may be asking why the Ivy JAR is not included with the scripting NAR that has the InvokeScriptProcessor and ExecuteScript processors in it, or if we can use the Module Directory to point at the Ivy JAR and be on our way.  I haven't been able to verify in the Groovy code, but by experimentation it appears that Grape uses the application classloader, and not the current thread context's class loader, to find the Ivy classes it needs. This means the Ivy JAR needs to be in the "original" NiFi classpath (i.e. in the lib/ folder of NiFi) rather than in the NAR or specified in the Module Directory property.

You can download the Apache Ivy JAR here (I tested with Ivy 2.4.0), and place it in your NiFi distribution under lib/, then restart NiFi.

Now we can get to the script part :)  Let's say we want to write a Groovy script for ExecuteScript to validate an incoming flow file in JSON format against a JSON Schema. There is a good JSON Schema Validator library for this on Github. In this case, the library only has a couple of dependencies, and the only one not available to ExecuteScript is org.json:json, so it's not a real pain to use Module Directory for this, but I wanted to keep it simple to show the general idea.  We use @Grab to get this dependency (and its transitive dependencies), then import the classes we will end up using to do the JSON Schema validation:
@Grab(group='org.everit.json', module='org.everit.json.schema', version='1.3.0')
import org.everit.json.schema.Schema
import org.everit.json.schema.loader.SchemaLoader
import org.json.JSONObject
import org.json.JSONTokener

// Rest of code here
If you don't already have all the artifacts in your Grapes cache, then the very first time this script runs, it will be much slower as it has to download each artifact and its transitive dependencies. Of course, this means your NiFi instance has to be able to get out to the internet, and Grape will need to be configured properly for your machine. In many cases the default settings are fine, but check out the Grape documentation as well as the default grapeConfig.xml settings (note that this is an Ivysettings file). NOTE: The default location for the Grapes cache is under the user's home directory in ~/.groovy/grapes. If you are running NiFi as a certain user, then you will want to ensure the user has permission to write to that directory.

An alternative to the possibly-long-download solution is to pre-install the grapes on the machine manually. If Groovy is installed on the machine itself (versus the one supplied with ExecuteScript) you can do the following (see the doc for more details):
grape install <groupid> <artifactid> [<version>]
to install the grapes into the cache. Then ExecuteScript with @Grab will be faster out of the gate.

Hopefully this post has offered a way to make prototyping new capabilities with ExecuteScript and Groovy a little faster and easier.

Cheers!