Wednesday, August 10, 2016

Executing Remote Commands in NiFi with ExecuteScript, Groovy, and Sshoogr

There are already some processors in Apache NiFi for executing commands, such as ExecuteProcess and ExecuteStreamCommand. These allow execution of remote scripts by calling the operating system's "ssh" command with various parameters (such as what remote command(s) to execute when the SSH session is established). Often this is accomplished by writing a bash script with a number of ssh commands (or a single ssh command with a long list of remote commands at the end).

As a different approach, you can use ExecuteScript or InvokeScriptedProcessor, along with a third-party library for executing remote commands with SSH. For this post, I will be using Groovy as the scripting language and Sshoogr, which is a very handy SSH DSL for Groovy.

Sshoogr can be installed via SDK Manager or downloaded (from Maven Central for example). The key is to have all the libraries (Sshoogr itself and its dependencies) in a directory somewhere. Then you point to the directory in ExecuteScript's Module Directory property, and those JARs will be available to the Groovy script.

In order to make this example fairly flexible, I chose a few conventions. First, the commands to be executed are expected to be on individual lines of an incoming flow file. Next, the configuration parameters (such as hostname, username, password, and port) are added to ExecuteScript as dynamic properties. Note that the password property is not sensitive in this case; if you want that, use InvokeScriptedProcessor and explicitly add the properties (specifying password as sensitive).

For Sshoogr to work (at least for this example), it is expected that the RSA key for the remote node is in the NiFi user's ~/.ssh/known_hosts file. Because of OS and other differences, sometimes the SSH connection will fail due to strict host key checking, so in the script we will disable that in Sshoogr.

Now to the script.  We start with the standard code for requiring an input flow file, and then use session.read() to get each line into a list called commands:
def flowFile = session.get()
if(!flowFile) return

def commands = []
// Read in one command per line
session.read(flowFile, {inputStream ->
    inputStream.eachLine { line -> commands << line  }
} as InputStreamCallback)
Then we disable strict host key checking:
options.trustUnknownHosts = true
Then we use the Sshoogr DSL to execute the commands from the flow file:
def result = null
remoteSession {

   host = sshHostname.value
   username = sshUsername.value
   password = sshPassword.value
   port = Integer.parseInt(sshPort.value)
        
   result = exec(showOutput: true, failOnError: false, command: commands.join('\n'))
}
Here we are using the aforementioned ExecuteScript dynamic properties (sshHostname, sshUsername, sshPassword, sshPort). Later in the post, a screenshot below shows them after having been added to the ExecuteScript configuration.

The exec() method is the powerhouse of Sshoogr. In this case we want the output of the commands (to put in an outgoing flowfile). Also I've set "failOnError" to false just so all commands will be attempted, but it is likely that you will want this to be true such that if a command fails, none of the following commands will be attempted. The final parameter in this example is "command". If you don't use the "named-parameter" (aka Map) version of exec() and just want to execute the list of commands (without setting "showOutput" for example), then exec() will take a Collection:
exec(commands)
In our case though, the exec() method will turn the "command" parameter into a string, so instead of just passing in the List "commands", we need to turn it back into a line-by-line string, so we use join('\n').

The next part of the script is to overwrite the incoming flow file, or if we could've created a new one with the original as a parent. If using ExecuteScript and creating a new flow file, be sure to remove the original. If using InvokeScriptedProcessor, you could define an "original" relationship and route the incoming flow file to that.
flowFile = session.write(flowFile, { outputStream ->
    result?.output?.eachLine { line ->
       outputStream.write(line.bytes)
       outputStream.write('\n'.bytes)
    }
} as OutputStreamCallback)
Finally, using result.exitStatus, we determine whether to route the flow file to success or failure:
session.transfer(flowFile, result?.exitStatus ? REL_FAILURE : REL_SUCCESS)
The full script for this example is as follows:
import static com.aestasit.infrastructure.ssh.DefaultSsh.*

def flowFile = session.get()
if(!flowFile) return

def commands = []
// Read in one command per line
session.read(flowFile, {inputStream ->
    inputStream.eachLine { line -> commands << line  }
} as InputStreamCallback)

options.trustUnknownHosts = true
def result = null
remoteSession {

   host = sshHostname.value
   username = sshUsername.value
   password = sshPassword.value
   port = Integer.parseInt(sshPort.value)
        
   result = exec(showOutput: true, failOnError: false, command: commands.join('\n'))
}

flowFile = session.write(flowFile, { outputStream ->
    result?.output?.eachLine { line ->
       outputStream.write(line.bytes)
       outputStream.write('\n'.bytes)
    }
} as OutputStreamCallback)

session.transfer(flowFile, result?.exitStatus ? REL_FAILURE : REL_SUCCESS)
Moving now to the ExecuteScript configuration:

Here I am connecting to my Hortonworks Data Platform (HDP) Sandbox to run Hadoop commands and such, the default settings are shown and you can see I've pasted the script into Script Body, and my Module Directory is pointing at my Sshoogr download folder. I added this to a simple flow that generates a flow file, fills it with commands, runs them remotely, then logs the output returned from the session:

For this example I ran the following two commands:
hadoop fs -ls /user
echo "Hello World!" > here_i_am.txt
In the log (after LogAttribute runs with payload included), I see the following:

And on the sandbox I see the file has been created with the expected content:

As you can see, this post described how to use ExecuteScript with Groovy and Sshoogr to execute remote commands coming from flow files. I created a Gist of the template, but it was created with an beta version of Apache NiFi 1.0 so it may not load into earlier versions (0.x). However all the ingredients are in this post so it should be easy to create your own flow, please let me know how/if it works for you :)

Cheers!

Thursday, June 9, 2016

Testing ExecuteScript processor scripts

I've been getting lots of questions about how to develop/debug scripts that go into the ExecuteScript processor in NiFi. One way to do this is to add a unit test to the nifi-scripting-processors submodule, and set the Script File property to your test script. However for this you basically need the full NiFi source.

To make things easier, I basically took a pared-down copy of the ExecuteScript processor (and its helper classes), added nifi-mock as a dependency, and slapped a command-line interface on it. This way with a single JAR you can run your script inside a dummy flow containing ExecuteScript.

The result is version 1.1.1 of the NiFi Script Tester utility, on GitHub and Bintray.

Basically it runs your script as a little unit test, and you can pipe stdin to become a flowfile, or point it at a directory and it will send every file as a flowfile, stuff like that. If your script doesn't need a flowfile, you can run it without specifying an input dir or piping in stdin, it will run once even without input.  The usage is as follows:
Usage: java -jar nifi-script-tester-<version>-all.jar [options] <script file>
 Where options may include:
   -success            Output information about flow files that were transferred to the success relationship. Defaults to true
   -failure            Output information about flow files that were transferred to the failure relationship. Defaults to false
   -no-success         Do not output information about flow files that were transferred to the success relationship. Defaults to false
   -content            Output flow file contents. Defaults to false
   -attrs              Output flow file attributes. Defaults to false
   -all-rels           Output information about flow files that were transferred to any relationship. Defaults to false
   -all                Output content, attributes, etc. about flow files that were transferred to any relationship. Defaults to false
   -input=<directory>  Send each file in the specified directory as a flow file to the script
   -modules=<paths>    Comma-separated list of paths (files or directories) containing script modules/JARs

As a basic example, let's say I am in the nifi-script-tester Git repo, and I've built using "gradle shadowJar" so my JAR is in build/libs. I can run one of the basic unit tests like so:
java -jar build/libs/nifi-script-tester-1.1.1-all.jar src/test/resources/test_basic.js
Which gives the following output:
2016-06-09 14:20:49,787 INFO  [pool-1-thread-1]                nifi.script.ExecuteScript - ExecuteScript[id=9a507773-86ae-4c33-957f-1c0270302a0e] hello
Flow Files transferred to success: 0
This shows minimal output, just the logging from the framework and the default summary statistic of "Flow Files transferred to success".  Instead let's try a more comprehensive example, where I pipe in a JSON file, run my Groovy script from the JSON-to-JSON conversion post, and display all the attributes and contents and statistics from the run:
cat src/test/resources/input_files/jolt.json | java -jar build/libs/nifi-script-tester-1.1.1-all.jar -all src/test/resources/test_json2json.groovy
This gives the following (much more verbose) output:
Flow file FlowFile[0,14636536169108_translated.json,283B]
---------------------------------------------------------
FlowFile Attributes
Key: 'entryDate'
	Value: 'Thu Jun 09 14:24:50 EDT 2016'
Key: 'lineageStartDate'
	Value: 'Thu Jun 09 14:24:50 EDT 2016'
Key: 'fileSize'
	Value: '283'
FlowFile Attribute Map Content
Key: 'path'
	Value: 'target'
Key: 'filename'
	Value: '14636536169108_translated.json'
Key: 'uuid'
	Value: '84d8d290-fbf9-4d57-aaf7-fd050da40d9f'
---------------------------------------------------------
{
    "Range": 5,
    "Rating": "3",
    "SecondaryRatings": {
        "metric": {
            "Id": "metric",
            "Range": 5,
            "Value": 6
        },
        "quality": {
            "Id": "quality",
            "Range": 5,
            "Value": 3
        }
    }
}

Flow Files transferred to success: 1

Flow Files transferred to failure: 0
There are options to suppress or select various things such as relationships, attributes, flowfile contents, etc.  As a final example let's look at the Hazelcast example from my previous post, to see how to add paths (files and directories) to the script tester, in the same way you'd set the Module Path property in the ExecuteScript processor:

java -jar ~/git/nifi-script-tester/build/libs/nifi-script-tester-1.1.1-all.jar -attrs -modules=/Users/mburgess/Downloads/hazelcast-3.6/lib hazelcast.groovy
And the output:

Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is STARTING
Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is STARTED
Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is CLIENT_CONNECTED
Jun 09, 2016 2:31:01 PM com.hazelcast.client.spi.impl.ClientMembershipListener
INFO:

Members [1] {
	Member [172.17.0.2]:5701
}

Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is SHUTTING_DOWN
Jun 09, 2016 2:31:01 PM com.hazelcast.core.LifecycleService
INFO: HazelcastClient[hz.client_0_dev][3.6] is SHUTDOWN
Flow file FlowFile[0,15008187914245.mockFlowFile,0B]
---------------------------------------------------------
FlowFile Attributes
Key: 'entryDate'
	Value: 'Thu Jun 09 14:31:01 EDT 2016'
Key: 'lineageStartDate'
	Value: 'Thu Jun 09 14:31:01 EDT 2016'
Key: 'fileSize'
	Value: '0'
FlowFile Attribute Map Content
Key: 'path'
	Value: 'target'
Key: 'hazelcast.customers.nifi'
	Value: '[name:Apache NiFi, email:nifi@apache.org, blog:nifi.apache.org]'
Key: 'filename'
	Value: '15008187914245.mockFlowFile'
Key: 'hazelcast.customers.mattyb149'
	Value: '[name:Matt Burgess, email:mattyb149@gmail.com, blog:funnifi.blogspot.com]'
Key: 'uuid'
	Value: '0a7c788e-0aef-40e4-b9a6-426f877dbfbe'
---------------------------------------------------------

Flow Files transferred to success: 1
This script would not work without the Hazelcast JARs, so this shows how the "-modules" option is used to add them to the classpath for testing.

The nifi-script-tester only supports Javascript and Groovy at the moment; including Jython (for example) would increase the JAR's size by 500% :/ Right now its only 8.6 MB, so a little big but not too bad.

Anyway hope this helps, please let me know how/if it works for you!

Cheers,
Matt

Wednesday, May 25, 2016

Validating JSON in NiFi with ExecuteScript

My last post alluded to a Groovy script for ExecuteScript that would use JSON Schema Validator to validate incoming flow files (in JSON format) against a JSON Schema. The purpose of that post was to show how to use Groovy Grape to get the JSON Schema Validator dependencies loaded dynamically by the script (versus downloading the JARs and adding them to the Module Directory property).  For this post I want to show the actual JSON validation script (as I presented it on the Apache NiFi "users" mailing list).

I'll use the schema as it was presented to me on the mailing list:
{
  "type": "object",
  "required": ["name", "tags", "timestamp", "fields"],
  "properties": {
    "name": {"type": "string"},
    "timestamp": {"type": "integer"},
    "tags": {"type": "object", "items": {"type": "string"}},
    "fields": { "type": "object"}
  }
}
This shows that the incoming flow file should contain a JSON object, that it needs to have certain fields (the "required" values), and the types of the values it may/must contain (the "properties" entries).  For this script I'll hard-code this schema, but I'll talk a bit at the end about how this can be done dynamically for a better user experience.

Since the schema itself is JSON, we use org.json.JSONObject and such to read in the schema. Then we use org.everit.json.schema.SchemaLoader to load in a Schema. We can read in the flow file with session.read, passing in a closure cast to InputStreamCallback, see my previous post for details, and call schema.validate(). If the JSON is not valid, validate() will throw a ValidationExecption. If it does, I set a "valid" variable to false, then route to SUCCESS or FAILURE depending on whether the incoming flow file was validated against the schema.  The original script is as follows:
import org.everit.json.schema.Schema
import org.everit.json.schema.loader.SchemaLoader
import org.json.JSONObject
import org.json.JSONTokener

flowFile = session.get()
if(!flowFile) return

jsonSchema = """
{
  "type": "object",
  "required": ["name", "tags", "timestamp", "fields"],
  "properties": {
    "name": {"type": "string"},
    "timestamp": {"type": "integer"},
    "tags": {"type": "object", "items": {"type": "string"}},
    "fields": { "type": "object"}
  }
}
"""

boolean valid = true
session.read(flowFile, { inputStream ->
   jsonInput = org.apache.commons.io.IOUtils.toString(inputStream,
java.nio.charset.StandardCharsets.UTF_8)
   JSONObject rawSchema = new JSONObject(new JSONTokener(new
ByteArrayInputStream(jsonSchema.bytes)))
   Schema schema = SchemaLoader.load(rawSchema)
   try {
      schema.validate(new JSONObject(jsonInput))
    } catch(ve) {
      log.error("Doesn't adhere to schema", ve)
      valid = false
    }
  } as InputStreamCallback)

session.transfer(flowFile, valid ? REL_SUCCESS : REL_FAILURE)

This is a pretty basic script, there are things we could do to improve the capability:
  • Move the schema load out of the session.read() method, since it doesn't require the input
  • Allow the user to specify the schema via a dynamic property
  • Do better exception handling and error message reporting
A worthwhile improvement (that would include all of these) is to turn the script into a proper Processor and put it in an InvokeScriptedProcessor. That way you could have a custom set of relationships, properties, to make it easy for the user to configure and use.

Of course, the best solution is probably to implement it in Java and contribute it to Apache NiFi under the Jira case NIFI-1893 :)

Cheers!

Tuesday, May 24, 2016

Using Groovy @Grab with ExecuteScript

As we've seen in a previous post, it is possible to add dependencies/modules to your ExecuteScript classpath by using the Module Directory property in the processor configuration dialog.  However many third-party libraries have (sometimes lots of) transitive dependencies, and downloading all of them and/or setting up your Module Directory can become a pain.

The Groovy world has a solution for such a thing, using Groovy Grape and specifically the @Grab annotation. This instructs Groovy to download the artifact(s) and their dependencies into the Grape cache. It works much like Maven or Ivy, and in fact it is based on Ivy. For this reason, ExecuteScript (when using the Groovy engine) needs the Apache Ivy JAR.

You may be asking why the Ivy JAR is not included with the scripting NAR that has the InvokeScriptProcessor and ExecuteScript processors in it, or if we can use the Module Directory to point at the Ivy JAR and be on our way.  I haven't been able to verify in the Groovy code, but by experimentation it appears that Grape uses the application classloader, and not the current thread context's class loader, to find the Ivy classes it needs. This means the Ivy JAR needs to be in the "original" NiFi classpath (i.e. in the lib/ folder of NiFi) rather than in the NAR or specified in the Module Directory property.

You can download the Apache Ivy JAR here (I tested with Ivy 2.4.0), and place it in your NiFi distribution under lib/, then restart NiFi.

Now we can get to the script part :)  Let's say we want to write a Groovy script for ExecuteScript to validate an incoming flow file in JSON format against a JSON Schema. There is a good JSON Schema Validator library for this on Github. In this case, the library only has a couple of dependencies, and the only one not available to ExecuteScript is org.json:json, so it's not a real pain to use Module Directory for this, but I wanted to keep it simple to show the general idea.  We use @Grab to get this dependency (and its transitive dependencies), then import the classes we will end up using to do the JSON Schema validation:
@Grab(group='org.everit.json', module='org.everit.json.schema', version='1.3.0')
import org.everit.json.schema.Schema
import org.everit.json.schema.loader.SchemaLoader
import org.json.JSONObject
import org.json.JSONTokener

// Rest of code here
If you don't already have all the artifacts in your Grapes cache, then the very first time this script runs, it will be much slower as it has to download each artifact and its transitive dependencies. Of course, this means your NiFi instance has to be able to get out to the internet, and Grape will need to be configured properly for your machine. In many cases the default settings are fine, but check out the Grape documentation as well as the default grapeConfig.xml settings (note that this is an Ivysettings file). NOTE: The default location for the Grapes cache is under the user's home directory in ~/.groovy/grapes. If you are running NiFi as a certain user, then you will want to ensure the user has permission to write to that directory.

An alternative to the possibly-long-download solution is to pre-install the grapes on the machine manually. If Groovy is installed on the machine itself (versus the one supplied with ExecuteScript) you can do the following (see the doc for more details):
grape install <groupid> <artifactid> [<version>]
to install the grapes into the cache. Then ExecuteScript with @Grab will be faster out of the gate.

Hopefully this post has offered a way to make prototyping new capabilities with ExecuteScript and Groovy a little faster and easier.

Cheers!

Wednesday, April 20, 2016

Using Lua with ExecuteScript in NiFi

My other blog posts have covered most of the languages supported by the ExecuteScript processor in Apache NiFi, except JRuby and LuaJ.  This is because I myself am not very familiar with either language, at least enough to do something like the JSON-to-JSON transformation example.

However they deserve some love too :) So for this post I'll do a very basic example using LuaJ to do some simple operations on a flow file.

Since ExecuteScript already binds the ProcessSession object to the variable "session", it is pretty straightforward to perform operations such as creating a flow file:
flowFile = session:create()

and adding an attribute to a flow file:
flowFile = session:putAttribute(flowFile, "lua.attrib", "Hello from Lua!")

However, it is slightly more complicated to create the equivalent of an anonymous class, such as a concrete implementation of the flow file IO interfaces like InputStreamCallback, OutputStreamCallback, and StreamCallback, which are used in methods like session.read() and session.write().  Luckily these interfaces each have a single method process(), and LuaJ allows us to create a "proxy object" from a Java interface, assigning a function to a variable named process:
local writecb =
  luajava.createProxy("org.apache.nifi.processor.io.OutputStreamCallback", {
    process = function(outputStream)
      outputStream:write("This is flow file content from Lua")
    end
})

This is an example of an OutputStreamCallback implementation, but the same approach works for InputStreamCallback and StreamCallback.

So my final (albeit simple) script to create a flow file, write some content, add an attribute, then transfer to success looks like this:
local writecb
  luajava.createProxy("org.apache.nifi.processor.io.OutputStreamCallback", {
    process = function(outputStream)
      outputStream:write("This is flow file content from Lua")
    end
})
flowFile = session:create()
flowFile = session:putAttribute(flowFile, "lua.attrib", "Hello from Lua!")
flowFile = session:write(flowFile, writecb)
session:transfer(flowFile, REL_SUCCESS)

If you are using LuaJ in ExecuteScript, I'd like to hear about how and what you are doing with it :)

Cheers!

Friday, April 15, 2016

Inspecting your NiFi DistributedMapCacheServer with Groovy

The DistributedMapCacheServer in Apache NiFi is a Controller Service that allows you to store key/value pairs for use by clients (such as NiFi's DistributedMapCacheClient service), see the official documentation for more details.

To create one, go to Controller Services (on the management toolbar on the right), create a new Controller Service, and select type DistributedMapCacheServer. Once named and saved, you can click the edit button (pencil icon) and set up the hostname and port and such:


Once the properties are saved, you can click the Enable button (lightning-bolt icon) and it will be ready for use by some NiFi Processors.

The DistributedMapCacheServer is mainly used for lookups and for keeping user-defined state. The PutDistributedMapCache and FetchDistributedMapCache processors are good for the latter. Other processors make use of the server to keep track of things like which files they have already processed. DetectDuplicate, ListFile, ListFileTransfer, GetHBase, and others make use of the cache server(s) too.

So it is certainly possible to have a NiFi dataflow with FetchDistributedMapCache scheduled for some period of time (say 10 seconds) connected to a LogAttribute processor or something to list the current contents of the desired keys in the cache. For this post I wanted to show how to inspect the cache from outside NiFi for two reasons. The first is to illustrate the use case of working with the cache without using NiFi components (good for automated population or monitoring of the cache from the outside), and also to show the very straightforward protocol used to get values from the cache. Putting data in is equally as simple, perhaps I'll add a follow-on post for that.

The DistributedMapCacheServer opens a port at the configured value (see dialog above), it expects a TCP connection and then various commands serialized in specific ways.  The first task, once connected to the server, is to negotiate the communications protocol version to be used for all future client-server operations.  To do this, we need the following:

  • Client sends the string "NiFi" as bytes to the server
  • Client sends the protocol version as an integer (4-bytes)


If you are using an output stream to write these values, make sure you flush the stream after these steps, to ensure they are sent to the server, so that the server can respond. The server will respond with one of three codes:

  • RESOURCE_OK (20): The server accepted the client's proposal of protocol name/version
  • DIFFERENT_RESOURCE_VERSION (21): The server accepted the client's proposal of protocol name but not the version
  • ABORT (255): The server aborted the connection


Once we get a RESOURCE_OK, we may continue on with our communications. If instead we get DIFFERENT_RESOURCE_VERSION, then the client needs to read in an integer containing the server's preferred protocol version. If the client can proceed using this version (or another version lower than the server's preference), it should re-negotiate the version by sending the new client-preferred version as an integer (note you do not need to send the "NiFi" again, the name has already been accepted).
If the client and server cannot agree on the protocol version, the client should disconnect from the server. If some error occurs on the server and it aborts the connection, the ABORT status code will be returned, and a message error can be obtained by the client (before disconnect) by reading in a string of UTF-8 bytes.

So let's see all this working in a simple example written in Groovy. Here is the script I used:
def protocolVersion = 1
def keys = ['entry', 'filename']

s = new Socket('localhost', 4557)

s.withStreams { input, output ->
  def dos = new DataOutputStream(output)
  def dis = new DataInputStream(input)
  
  // Negotiate handshake/version
  dos.write('NiFi'.bytes)
  dos.writeInt(protocolVersion)
  dos.flush()  
 
  status = dis.read()
  while(status == 21) {
     protocolVersion = dis.readInt()
     dos.writeInt(protocolVersion)
     dos.flush()
     status = dis.read()
  }
  
  // Get entries
  keys.each {
      key = it.getBytes('UTF-8')
      dos.writeUTF('get')
      def baos = new ByteArrayOutputStream()
      baos.write(key)
      dos.writeInt(baos.size())  
      baos.writeTo(dos)
      dos.flush()
      def length = dis.readInt()
      def bytes = new byte[length]
      dis.readFully(bytes)
      println "$it = ${new String(bytes)}"
  }
  
  // Close 
  dos.writeUTF("close");
  dos.flush();
}

I have set the protocol version to 1, which at present is the only accepted version. But you can set it higher to see the protocol negotiation work.

Also I have the variable "keys" with a list of the keys to look up in the cache. There is no mechanism at present for retrieving all the keys in the cache. This is probably for simplicity and to avoid denial-of-service type stuff if there are tons and tons of keys.  For our example, it will fetch the value for each key and print out the key/value pair.

Next you can see the creation of the socket, using the same port as was configured for the server (4557). Then Groovy has some nice additions to the Socket class, to offer an InputStream and OutputStream for that socket to your closure. Since we'll be dealing with bytes, strings, and integers, I thought a DataInputStream and DataOutputStream would be easiest (also this is how the DistributedMapCacheClient works).

The next two sections perform the protocol version negotiation as described above. Then for each key we write the string "get" followed by the key name as bytes. That is the entirety of the "get" operation :)

The server responds with the length of the key's value (in bytes). We read in the length as an integer, then read in a byte array containing the value. For my case I know the keys are strings, I simply create a String from the bytes and print out the key value pair. To my knowledge the only kinds of serialized values used by the NiFi DistributedMapCacheClient are a byte array, String, and CacheValue (used exclusively by the DetectDuplicate processor).

Once we're done reading key/value pairs, we write and flush the string "close" to tell the server our transaction is complete. I did not expressly close the socket connection, that is done by withStreams() which closes the streams when the closure is finished.

That's all there is to it! This might not be a very common use case, but it was fun to learn about some of the intra-NiFi protocols, and being able to get some information out of the system using different methods :)

Cheers!


Friday, April 8, 2016

SQL in NiFi with ExecuteScript

There is a good amount of support for interacting with Relational Database Management systems (RDBMS) in Apache NiFi:

  • Database Connection Pool controller service: A shared resource for processors to get connections to an RDBMS
  • ExecuteSQL: A processor to execute SELECT queries against an RDBMS
  • PutSQL: A processor to execute statements (INSERT, UPDATE, e.g.) against an RDBMS
  • QueryDatabaseTable: A processor to perform incremental fetching from an RDBS table

I will have a blog soon describing the configuration and use of the QueryDatabaseTable processor, which was added in Apache NiFi 0.6.0.

To set up a Database Connection Pool controller service, refer to this User Guide section. I have configured mine for a local PostgreSQL instance:


NOTE: I named it 'PostgresConnectionPool', that will enter into the script config later.

Back to the title of this post :)  The scripting processors don't know about (or have a dependency on) the Database Connection Pool controller service instances or even the API (DBCPService interface, e.g.). This would often preclude our code from accessing the service to get a database connection.

However, DBCPService has a single method getConnection(), which returns a java.sql.Connection. This class is part of Java proper, and is all we really need from the service; we can talk JDBC from there.  One of the great things about Groovy is dynamic method invocation, meaning I can call a method on an object if I know the method is there, even if I don't know what Class is the type of the object.  We'll get to that shortly.

To work with a Controller Service from ExecuteScript, we need to get a reference to a ControllerServiceLookup from the process context. In Groovy, this looks like:
def lookup = context.controllerServiceLookup
Now that we have a lookup, we use it to locate the service we want. If we know the controller service we want (and it won't change, get deleted/recreated, etc.), we can get the identifier (Id below):


 However, for this example I wanted the user to be able to specify the name of the controller service (in this case PostgresConnectionPool), not the identifier. For that we need to get all controller service identifiers, then find the one whose name equals PostgresConnectionPool.

I used a dynamic property in the ExecuteScript config dialog to let the user set the name of the desired Database Connection Pool controller service:


ExecuteScript will create a variable for each dynamic property, and bind a PropertyValue object to it. This is so you can access the value of the property as the correct type (String, integer, etc.). In our case it's a String so we can use the PropertyValue.getValue() method. In Groovy it's as simple as:
def dbServiceName = databaseConnectionPoolName.value
Now that we have a ControllerServiceLookup and the name of the service we want to find, we can use the APIs to iterate over the services until we find one whose name is the one we're looking for:
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
Now we have the identifier of the service we want, and can use lookup.getControllerService(dbcpServiceId) to get a reference to the ControllerService itself.  Note we haven't referred to this service as a DBCPService, because the script (and the processor) do not have access to that class. However as I said before, we know the method (getConnection) we want to call and we have access to the return type of that method. So in Groovy you can just invoke it:
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
I used the safe-reference operator (?.) but to go further you will want to ensure conn is not null, and report an error if it is.

Now that we have a Connection object, we could use the JDBC API (java.sql.*) to issue queries, go through the ResultSet's rows, then get each column's name and value, etc. etc. However Groovy has an excellent object called groovy.sql.Sql that does all this with Groovy idioms.  For example, to issue a query 'select * from users' and iterate over the rows (with their row number), you have:
def sql = new Sql(conn)
sql.rows('select * from users').eachWithIndex { row, idx ->
    // Do stuff for each row here
}
In my case, I want to find the column names the first time, and output them as CSV headers. Then for all rows I want to comma-separate the values and output:
if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
out.write((row.values().join(',') + "\n").getBytes())
All that's left to do is to set the filename attribute to the one specified by the filename variable (see the Configure Processor dialog above), and transfer the new flow file.  The entire script looks like this:

import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
flowFile = session.create()
flowFile = session.write(flowFile, {out -> 
    def sql = new Sql(conn)
    sql.rows('select * from users').eachWithIndex { row, idx ->
        if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
        out.write((row.values().join(',') + "\n").getBytes())
    }
  } as OutputStreamCallback)
  flowFile = session.putAttribute(flowFile, 'filename', filename.value)
  session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Scripting error', e)
    session.transfer(flowFile, REL_FAILURE)
}
conn?.close()

This script probably needs a little work before you'd want to use it, to check whether the Controller Service was found, to quote any row value that has a comma in it, etc. But I tried to keep it brief to illustrate the concepts, which are the fluent NiFI API and the cool idioms of Groovy :) For my example table, the script produces a valid CSV file:


This processor is available as a template on Gist (here), as always I welcome all questions, comments, and suggestions.

Cheers!

Monday, March 14, 2016

ExecuteScript - JSON-to-JSON Revisited (with Jython)

I've received some good comments about a couple of previous blog posts on using the ExecuteScript processor in NiFi (0.5.0+) to perform JSON-to-JSON transformations. One post used Groovy and the other used Javascript.

Since then I've received some requests for a Jython example for ExecuteScript, so I figured I'd do the same use case again (JSON to JSON) so folks can see the differences in the languages when performing the same operations :)

The approach has been covered in detail in the other posts, so I will talk a bit about the Jython-specific stuff and then get right to the code.

One major caveat here is that I don't know Python :)  I learned enough to get the script working, but please let me know how to better do any of this. I stopped touching the script once it worked, so it's very possible there are unnecessary imports, classes, etc.

The first thing to do is to bring in the Jython libraries you will need, as well as importing the Java and NiFi classes to be used:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

I didn't need to import java.lang.String as Jython does type coercion. I probably couldn't call getBytes() on that string unless Jython knew to coerce the object to a Java String, but that's ok because we can call bytearray("myString".encode('utf-8')) to achieve the same results.

The next task was to create a StreamCallback object for use in session.write(). I created a Jython class for this and overrode the interface method:
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
        # ...

After that, I read in and parsed the JSON text with IOUtils then json.loads(), then performed all the operations on the various parts of the object/dictionary. Finally I generated a new JSON string with json.dumps(), encoded to UTF-8, got the byte array, and wrote it to the output stream.

The resulting script is as follows:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "Range": 5,
          "Rating": obj['rating']['primary']['value'],
          "SecondaryRatings": {}
        }
    for key, value in obj['rating'].iteritems():
      if key != "primary":
        newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}
              
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

The template is available as a Gist (here). I welcome all suggestions on how to make this better, and please share any scripts you come up with!

Cheers!

Wednesday, March 2, 2016

ExecuteScript - JSON-to-JSON Revisited (with Javascript)

A question came in on the Apache NiFi users group about doing things with ExecuteScript using something other than Groovy (specifically, Javascript). I put a fairly trivial example inside a previous post, but it doesn't cover one of the most important features, overwriting flow file content.

One major difference between Groovy and Javascript here is that you will want to get a reference to Java objects using Java.type(), in order to create new objects, invoke static methods, etc. I use the following in the script below:

var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

Also, the Javascript engine has something very much like closure coercion in Groovy. If you are creating a new anonymous object from a class/interface, and that class/interface has a single method, you can provide a Javascript function (with the appropriate arguments) in the object constructor. So to create an implementation of StreamCallback (used in session.write() to overwrite the contents of a flowfile), you have:

new StreamCallback(function(inputStream, outputStream) { /* Do stuff */ })

With that in mind, I recreated the JSON-to-JSON example from my previous post, this time using Javascript as the language.  The script is as follows:

var flowFile = session.get();
if (flowFile != null) {

  var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback")
  var IOUtils = Java.type("org.apache.commons.io.IOUtils")
  var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        var obj = JSON.parse(text)
        var newObj = {
          "Range": 5,
          "Rating": obj.rating.primary.value,
          "SecondaryRatings": {}
        }
        for(var key in obj.rating){
          var attrName = key;
          var attrValue = obj.rating[key];
          if(attrName != "primary") {
            newObj.SecondaryRatings[attrName] = {"id": attrName, "Range": 5, "Value": attrValue.value};
          }
        }
        outputStream.write(JSON.stringify(newObj, null, '\t').getBytes(StandardCharsets.UTF_8)) 
    }))
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)
}

The template is available a Gist (here), please let me know how/if this works for you.  Cheers!

UPDATE: This script works with Nashorn (Java 8's Javascript engine), but doesn't work if you're using Java 7 (Rhino).  Here's an equivalent script for Java 7 / Rhino:

importClass(org.apache.commons.io.IOUtils)
importClass(java.nio.charset.StandardCharsets)
importClass(org.apache.nifi.processor.io.StreamCallback)
var flowFile = session.get();
if (flowFile != null) {

flowFile = session.write(flowFile,
        new StreamCallback() {process : function(inputStream, outputStream) {
            var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
     var obj = JSON.parse(text)
            var newObj = {
  "Range": 5,
  "Rating": obj.rating.primary.value,
  "SecondaryRatings": {}
     }
     for(var key in obj.rating){
             var attrName = key;
             var attrValue = obj.rating[key];
    if(attrName != "primary") {
     newObj.SecondaryRatings[attrName] = {"id": attrName, "Range": 5, "Value": attrValue.value};
  }
            }
    outputStream.write(new java.lang.String(JSON.stringify(newObj, null, '\t')).getBytes(StandardCharsets.UTF_8)) 
        }})
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
}


Wednesday, February 24, 2016

Writing Reusable Scripted Processors in NiFi

This blog has quite a few posts about the various things you can do with the new (as of NiFi 0.5.0) scripting processors. Most are about ExecuteScript and how to use it to do per-flowfile things like replace content, use external modules to add functionality, etc.  However most are specific for the task at hand, and aren't really general-use scripts.

We could use dynamic properties (explained in the Developer's Guide and in an earlier post), as they are passed into ExecuteScript as variables. However the user of the processor would have to know which properties to add and fill in, and there's no good way to get that information to the user (at least with ExecuteScript).

However, InvokeScriptedProcessor lets you provide a scripted implementation of a full Processor instance. This means you can define your own properties and relationships, along with documentation and validation of them.  Your script could provide capabilities that depend on the way the user of the processor configures the processor, without having to interact with the script at all!

I'll illustrate this below, but I think the coolest point is: A template with a single InvokeScriptedProcessor (that contains a working script) can be dragged onto the canvas and basically acts like dragging your custom processor onto the canvas! When the user opens the dialog, they will see the properties/relationships you added, and they will be validated just like the normal ones (script language, body, etc.) that come with the processor.

The scripted processor needs only implement the Processor interface, which in turn extends AbstractConfigurableComponent. A basic Groovy skeleton with a class including a set of overridden interface methods looks like this:

class MyProcessor implements Processor {

    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
      // do stuff
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        return null
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override
    List<PropertyDescriptor> getPropertyDescriptors() { return [] as List }

    @Override
    String getIdentifier() { return 'MyProcessor-InvokeScriptedProcessor' }
}

processor = new MyProcessor()

Note that the class must implement Processor and declare a variable named "processor" that contains an instance of the class. This is the convention required by the InvokeScriptedProcessor.

IMPORTANT: Although you may find in NiFi code that many processors extend either AbstractProcessor or AbstractSessionFactoryProcessor, your script will most likely NOT work if it extends one of these classes. This is due to the validate() method of these classes being declared final, and the basic implementation will expect the set of Supported Property Descriptors to include the ones that come with the InvokeScriptedProcessor (like Script File), but will only use the list that your scripted processor provides. There might be a hack to get around this but even if possible, it's not likely worth it.

Moving on, let's say we want to create a reusable scripted processor that works like GenerateFlowFile but allows the user to provide the content of the flow file as well as the value of its "filename" attribute.  Moreover, maybe the content could include NiFi Expression Language (EL) constructs like ${hostname()}. Since the content may have something like EL statements but the user might not want them evaluated as such, we should let the user decide whether to evaluate the content for EL statements before writing to the flow file.  Lastly, this is a "generate" processor so we only need a "success" relationship; "failure" doesn't really make sense here. Having said that, it will be important to catch all Exceptions that your code can throw; wrap each in a ProcessException and re-throw, so the framework can handle it correctly.

So the list of things to do:

  1. Add a "success" relationship and return it in (in a Set) from getRelationships()
  2. Add a "File Content" property to contain the intended content of the flow file (may include EL)
  3. Add a "Evaluate Expressions in Content" property for the user to indicate whether to evaluate the content for EL
  4. Add an optionally-set "Filename" property to override the default "filename" attribute.
  5. When the processor is triggered, create a flow file, write the content (after possibly evaluating EL), and possibly set the filename attribute

Here is some example Groovy code to do just that:
class GenerateFlowFileWithContent implements Processor {

    def REL_SUCCESS = new Relationship.Builder()
            .name('success')
            .description('The flow file with the specified content and/or filename was successfully transferred')
            .build();

    def CONTENT = new PropertyDescriptor.Builder()
            .name('File Content').description('The content for the generated flow file')
            .required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build()
    
    def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
            .name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
            .required(true).allowableValues('true','false').defaultValue('false').build()
            
    def FILENAME = new PropertyDescriptor.Builder()
            .name('Filename').description('The name of the flow file to be stored in the filename attribute')
            .required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
    
    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
      try {
        def session = sessionFactory.createSession()
        def flowFile = session.create()
        
        def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
        def contentProp = context.getProperty(CONTENT)
        def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
        def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
        
        flowFile = session.write(flowFile, { outStream ->
                outStream.write(content.getBytes("UTF-8"))
            } as OutputStreamCallback)
        
        if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
        // transfer
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
      } catch(e) {
          throw new ProcessException(e)
      }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        switch(name) {
            case 'File Content': return CONTENT
            case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
            case 'Filename': return FILENAME
            default: return null
        }
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override
    List<PropertyDescriptor>> getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }

    @Override
    String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
    
}

processor = new GenerateFlowFileWithContent()

When this is entered into the Script Body of an InvokeScriptedProcessor, with the language set to Groovy and then applied (by clicking Apply on the dialog), then when the dialog is reopened you should see the relationships set to only "success" and the properties added to the config dialog:


At this point you can save the single processor as a template, calling it perhaps GenerateFlowFileWithContent or something.  Now it is a template that is basically reusable as a processor.  Try dragging it onto the canvas and entering some values, then wiring it to some other processor like PutFile (to see if it works):


Once the success relationship has been satisfied, that instance should be good to go:


Hopefully this has illustrated the power and flexibility of InvokeScriptedProcessor, and how it can be used to create reusable processor templates with custom logic, without having to construct and deploy a NAR. The example template is available as a Gist (here); as always I welcome all comments, questions, and suggestions.

Cheers!

Tuesday, February 23, 2016

ExecuteScript Explained - Split fields and NiFi API with Groovy

There was a question on Twitter about being able to split fields in a flow file based on a delimiter, and selecting the desired columns. There are a few ways to do this in NiFi, but I thought I'd illustrate how to do it using the ExecuteScript processor (new in NiFi 0.5.0).

The approach from the NiFi side is very similar to my previous post on replacing flow file content:
def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   // Read incoming flow file content with inputStream
   // ... other stuff...
   // Write outgoing flow file content with OutputStream
} as StreamCallback)
Before we get to the "split text on delimiter" part, I'll explain a little bit more about what's going on above in terms of the NiFi API and Groovy.

The script is evaluated when the ExecuteScript processor is triggered. The conditions under which the processor may be triggered are listed in the Developer's Guide here.  The "session" object is a ProcessSession instance, and the get() call will return a flow file if it is available. It is possible that get() will return null. This can happen if there are no incoming connections and the processor has been scheduled to run. Also it can happen if there were a flowfile available in the queue but another task has already claimed it after this task has been triggered.  In our example, we only want to work on existing flowfiles, so we perform a session.get() and return from the script if no flow file was available.

The meat of the script is the session.write() call. It is based on one of the ProcessSession API's write methods:
FlowFile write(FlowFile source, StreamCallback writer) 
We use this to pass the incoming flow file in, along with a callback to overwrite the content. It returns a FlowFile because FlowFile objects are immutable, so the reference returned by write() is a reference to the latest version of that flowfile (with the content updated). The latest reference to a modified flowfile is the one that is expected to be transferred. You can see this in the skeleton above:
flowFile = session.write(flowFile, ...
The framework will invoke a method on the StreamCallback, passing in an InputStream (associated with the incoming flowfile's content), and an OutputStream (where you write the new content). This is StreamCallback's single method:
void process(InputStream in, OutputStream out) throws IOException
This is where Groovy's closure coercion feature really shines. Instead of implementing a class (or anonymous class) and declaring that method:
def callback = new StreamCallback() {
  void process(InputStream in, OutputStream out) throws IOException {
     doStuff()
  }
}
If the interface has a single method, you can create a closure and use the "as" keyword to coerce it into the single method of the interface, like so:
def callback = { doStuff() } as StreamCallback
In the skeleton above, I didn't create a variable to hold the callback, I just passed it into the write() method.  As far as boilerplate code goes, the above skeleton is not too bad :)

The example I'll use (based on the Tweet I saw) has a flowfile with the following contents:
a1|b1|c1|d1
a2|b2|c2|d2
a3|b3|c3|d3
The desired output is the middle two columns, delimited by a space instead of the bar:
b1 c1
b2 c2
b3 c3
Here's the script I ended up with:
import java.nio.charset.StandardCharsets

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   inputStream.eachLine { line ->
   a = line.tokenize('|')
   outputStream.write("${a[1]} ${a[2]}\n".toString().getBytes(StandardCharsets.UTF_8))
   }
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)
Notice there's no error handling, stay tuned for most posts :)  Here's a screenshot for a test flow that puts the sample data in and writes the expected output to a file:



The test flow template is available as a Gist (here). I hope this was helpful, as always I welcome all comments, questions, and suggestions.

Cheers!

Friday, February 19, 2016

ExecuteScript - Extract text & metadata from PDF

This post is about using Apache NiFi, its ExecuteScript processor, and Apache PDFBox to extract text and metadata from PDF files. It is similar to a previous post of mine, using Module Path to include JARs. But this is a good use case as well, so I thought I'd write a bit about it. This one will be short and sweet, but the aforementioned post has more details :)

In my example, I'm using the GetFile processor to find all PDFs in a directory. They are sent to an ExecuteScript processor, which uses PDFBox and PDFTextStripper (and other classes) to extract the text into the flowfile content, and adds metadata as attributes. The resulting script is here:

import org.apache.pdfbox.pdmodel.*
import org.apache.pdfbox.util.*

def flowFile = session.get()
if(!flowFile) return

def s = new PDFTextStripper()
def doc, info

flowFile = session.write(flowFile, {inputStream, outputStream ->
 doc = PDDocument.load(inputStream)
 info = doc.getDocumentInformation()
        s.writeText(doc, new OutputStreamWriter(outputStream))
    } as StreamCallback
)
flowFile = session.putAttribute(flowFile, 'pdf.page.count', "${doc.getNumberOfPages()}")
flowFile = session.putAttribute(flowFile, 'pdf.title', "${info.getTitle()}" )
flowFile = session.putAttribute(flowFile, 'pdf.author',"${info.getAuthor()}" );
flowFile = session.putAttribute(flowFile, 'pdf.subject', "${info.getSubject()}" );
flowFile = session.putAttribute(flowFile, 'pdf.keywords', "${info.getKeywords()}" );
flowFile = session.putAttribute(flowFile, 'pdf.creator', "${info.getCreator()}" );
flowFile = session.putAttribute(flowFile, 'pdf.producer', "${info.getProducer()}" );
flowFile = session.putAttribute(flowFile, 'pdf.date.creation', "${info.getCreationDate()}" );
flowFile = session.putAttribute(flowFile, 'pdf.date.modified', "${info.getModificationDate()}");
flowFile = session.putAttribute(flowFile, 'pdf.trapped', "${info.getTrapped()}" );   
session.transfer(flowFile, REL_SUCCESS)


I then put the file's text contents out (using PutFile) and also logged the metadata attributes. The flow looks like this:

The template is available as a Gist (here). Please let me know if you find it useful, and/or if you have comments, questions, or suggestions. Cheers!

Thursday, February 11, 2016

ExecuteScript - JSON-to-JSON conversion

Given the cool UI and data provenance features of NiFi, performing complex JSON-to-JSON conversion should be really simple, and once NIFI-361 is implemented it will be.  Fortunately, in the meantime, some of this can be achieved with the scripting processors (available as of Apache NiFi 0.5.0). As an example, I will present an ExecuteScript processor that performs the same conversion as the demo at http://jolt-demo.appspot.com/, with an extra metric added to really show the JSON-to-JSON transformation.

This is not at all a jab against JOLT; to the contrary, NIFI-361 will likely use JOLT as it is a powerful transformation DSL. This is more about the spirit of the ExecuteScript processor, as it is a great way to rapidly enable new features and/or data transformations.

I choose Groovy for the scripting language for ExecuteScript for two reasons: one, I am most familiar with it :) and two, you can do clean JSON-to-JSON conversions with JsonSlurper and JsonBuilder.

To emulate the jolt-demo, I start with a flow file containing the following JSON:

{
   "rating": {
      "primary": {
         "value": 3
      },
      "quality": {
         "value": 3
      },
      "metric": {
         "value": 6
      }
   }
}

Using the jolt-demo transform, the following should be output:

{
    "Range": 5,
    "Rating": "3",
    "SecondaryRatings": {
        "metric": {
            "Id": "metric",
            "Range": 5,
            "Value": 6
        },
        "quality": {
            "Id": "quality",
            "Range": 5,
            "Value": 3
        }
    }
}

Using the same rules (translated into JsonBuilder format), the script is:

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new groovy.json.JsonSlurper()

flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        def builder = new groovy.json.JsonBuilder()
        builder.call {
            'Range' 5
            'Rating' "${obj.rating.primary.value}"
            'SecondaryRatings' {
                obj.rating.findAll {it.key != "primary"}.each {k,v ->
                    "$k" {
                         'Id' "$k"
                         'Range' 5
                         'Value' v.value
                    }
                }
            }
        }
        outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)


The template is available as a Gist (here), please let me know if you have any questions, comments, or suggestions for make the scripting processors even better!

Cheers!

Wednesday, February 10, 2016

InvokeScriptedProcessor - Hello World!

In some of my previous posts I outlined some use cases for the ExecuteScript processor in NiFi (starting with 0.5.0). Now let's get to the real powerhouse of the scripting additions, the InvokeScriptedProcessor... processor :)

ExecuteScript is a glorified onTrigger() call where you can work with incoming flow files, create new ones, add attributes, etc. It is meant for the "per-flowfile" paradigm, where the other aspects of the Processor API do not apply.  But perhaps you want a full-fledged Processor local to your cluster and want to avoid the overhead of building a NAR, submitting the code, etc. etc.  With InvokeScriptedProcessor, you can use Javascript, Groovy, Jython, Lua, or JRuby to create a Processor implementation. InvokeScriptedProcessor will delegate methods such as getPropertyDescriptors, getRelationships, onTrigger, etc. to the scripted processor.  This has more power than ExecuteScript because custom properties and relationships can be defined, plus more methods than onTrigger() can be implemented.

One main difference between ExecuteScript and InvokeScriptedProcessor is that REL_SUCCESS and REL_FAILURE are the only two relationships available to ExecuteScript and are passed in automatically. For InvokeScriptedProcessor, all relationships (and all other Processor interface methods) must be defined by the scripted processor, and a variable named "processor" must be defined and point to a valid instance of the scripted processor.

Below is an example of a bare-bones scripted Processor that expects input from a CSV-formatted flow file (coming from the random user generation site https://randomuser.me/, the query is http://api.randomuser.me/0.6/?format=csv&nat=us&results=100. It splits on commas and takes the third and fourth (indexes 2 and 3) values (first and last name, respectively), then outputs the capitalized first name followed by the capitalized last name:


class GroovyProcessor implements Processor {


    def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
    def ProcessorLog log

    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }

    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            def selectedColumns = ''
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        String line
                        final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
                        line = inReader.readLine()
                        String[] header = line?.split(',')
                        selectedColumns = "${header[1]},${header[2]}"                  
                        while (line = inReader.readLine()) {
                            String[] cols = line.split(',')
                            outputStream.write("${cols[2].capitalize()} ${cols[3].capitalize()}\n".getBytes('UTF-8'))
                        }
                    } as StreamCallback)

     flowFile = session.putAttribute(flowFile, "selected.columns", selectedColumns)
     flowFile = session.putAttribute(flowFile, "filename", "split_cols_invoke.txt")
            // transfer
            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
            throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

    List<PropertyDescriptor> getPropertyDescriptors() { return null }

    @Override

    String getIdentifier() { return null }
}

processor = new GroovyProcessor()

Besides the relationship differences, there are a couple of other noteworthy differences between ExecuteScript and InvokeScriptedProcessor (plus the latter's need to implement Processor API methods):

1) ExecuteScript handles the session.commit() for you, but InvokeScriptedProcessor does not.
2) ExecuteScript has a "session" variable, where the InvokeScriptedProcessor's onTrigger() method must call sessionFactory.createSession()

A future post will cover the other Processor API methods, specifically those that add properties and/or relationships. This is meant as an introduction to writing a Processor in a scripting language using InvokeScriptedProcessor.

The template for the above processor (and the associated incoming data) is available as a Gist (here). As always I welcome all comments, questions, and suggestions.

Cheers!

ExecuteScript - Using Modules

This is the third post in a series of blogs about the ExecuteScript processor (new as of Apache NiFi 0.5.0). The first two dealt with such concepts as sessions, attributes, and replacing content in flow files.  This post is about using the "Module Directory" property to bring in additional dependencies not bundled with the ExecuteScript NiFi ARchive (NAR).  Specifically for this example we will create a Hazelcast client in both Groovy and JavaScript that will read in entries from a Map stored in Hazelcast. And to illustrate the ability to create flow files, we won't be expecting any incoming flow files, instead we'll create a new flow file and set attributes corresponding to each entry in the map.


One of the most powerful aspects of ExecuteScript is to leverage JARs that aren't already part of the scripting NAR. This means you can bring in capabilities that aren't already built-in to the various scripting languages and/or NiFi itself. To illustrate, I have a template that will use Hazelcast JARs to create a client and read in a stored "customer" Map.

To seed the map, I have a Groovy script that will store two entries into the customer map. It's available as a Gist (here), and points at my Docker container running Hazelcast.  It will put the following entries into a Hazelcast map called "customer":

'mattyb149': ['name': 'Matt Burgess', 'email': 'mattyb149@gmail.com', 'blog': 'funnifi.blogspot.com']
'nifi': ['name': 'Apache NiFi', 'email': 'nifi@apache.org', 'blog': 'nifi.apache.org']

To get the Hazelcast JARs, I downloaded and unzipped Hazelcast 3.6 (latest download here) into my Downloads folder. Then I needed to tell the ExecuteScript processors to find the appropriate JARs, so I put the following into the Module Directory for the ExecuteScript processors:

/Users/mburgess/Downloads/hazelcast-3.6/lib/hazelcast-3.6.jar,/Users/mburgess/Downloads/hazelcast-3.6/lib/hazelcast-client-3.6.jar

The Module Directory property takes a comma-separated list of files and folders pointing to external dependencies. For example, for Jython if you want to bring in extra *.py files, point at the folder that contains them. For Groovy and Javascript, you can point at a folder or files, and if a folder is designated, all JARs in that folder will be available.

The Groovy script to create a new flow file and use the Hazelcast client to read in the customer map is as follows:

import com.hazelcast.client.*
import com.hazelcast.client.config.*
import com.hazelcast.core.*

HazelcastInstance client
try {
   ClientConfig clientConfig = new ClientConfig();
   clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass");
   clientConfig.getNetworkConfig().addAddress("192.168.99.100", "192.168.99.100:32780");

   client = HazelcastClient.newHazelcastClient(clientConfig)
   flowFile = session.create()
      client.getMap("customers").each {k,v ->
      flowFile = session.putAttribute(flowFile, "hazelcast.customers.$k" as String, v as String)
   }
   session.transfer(flowFile, REL_SUCCESS)
}
catch(e) {
   log.error("Something went wrong with Hazelcast", e)
   session.transfer(flowFile, REL_FAILURE)
} finally {
   client?.shutdown()
}

For the Javascript version:

var clientConfig = new com.hazelcast.client.config.ClientConfig();
clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass");
clientConfig.getNetworkConfig().addAddress("192.168.99.100", "192.168.99.100:32780");

var client = com.hazelcast.client.HazelcastClient.newHazelcastClient(clientConfig)
flowFile = session.create()
var map = client.getMap("customers")
for each (var e in map.keySet()) {
  flowFile = session.putAttribute(flowFile, "hazelcast.customers."+e, map.get(e))
}
session.transfer(flowFile, REL_SUCCESS)

The template is available as a Gist (here), and the output in the logs looks something like:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Wed Feb 10 19:29:34 EST 2016'
Key: 'lineageStartDate'
Value: 'Wed Feb 10 19:29:34 EST 2016'
Key: 'fileSize'
Value: '0'
FlowFile Attribute Map Content
Key: 'filename'
Value: '1234292200327808'
Key: 'hazelcast.customers.mattyb149'
Value: '[name:Matt Burgess, email:mattyb149@gmail.com, blog:funnifi.blogspot.com]'
Key: 'hazelcast.customers.nifi'
Value: '[name:Apache NiFi, email:nifi@apache.org, blog:nifi.apache.org]'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'f51ce2c2-a303-4daa-934d-c2a0639c173c'
--------------------------------------------------

Hopefully this post has successfully illustrated how to create flow files and how to use the Module Directory property to bring in external dependencies.  Please let me know if you have tried this and what your results were.  As always, I welcome all comments, questions, and suggestions :)

Cheers!

ExecuteScript processor - Replacing Flow File Content

In my last post, I introduced the Apache NiFi ExecuteScript processor, including some basic features and a very simple use case that just updated a flow file attribute. However NiFi has a large number of processors that can perform a ton of processing on flow files, including updating attributes, replacing content using regular expressions, etc.  Where the ExecuteScript processor will shine is for use cases that cannot be satisfied with the current set of processors.

As an example, this blog post will present the ubiquitous Word Count example, where a text file (The Tell-Tale Heart by Edgar Allan Poe) is read in, split on non-alphanumeric characters, then each word's frequency in the corpus is calculated. The incoming flow file's contents are replaced with lines of "word: frequency" for each unique word/term in the corpus.


The previous post included a discussion on how to ensure your script will get a valid flow file (namely, returning if session.get() does not return a flow file object). It also illustrated how to use session.putAttribute() to add/update an attribute, and the importance of keeping the latest reference to the flow file object.  This post will focus on Groovy code to replace the content of an incoming flow file.

A very concise way to replace flow file content (at least in Groovy) is to leverage ProcessSession's write() method that takes a StreamCallback object. The StreamCallback will get an InputStream (from the incoming flow file) and an OutputStream (where the new content should go). The best part is that the StreamCallback interface has a single method, so with Groovy we can just use closure coercion instead of creating an explicit implementation of the interface.  Here's what such a skeleton looks like:

flowFile = session.write(flowFile, {inputStream, outputStream ->
   // Read incoming flow file content with inputStream
   // ... other stuff...
   // Write outgoing flow file content with OutputStream
} as StreamCallback)

If you need to read the entire flow file into a String (which you should avoid in case you get very large files), you can import org.apache.commons.io.IOUtils and use IOUtils.toString(InputStream, Charset). See the full example below.

My example reads the entire text in, to keep the code simple, but for a real script you might want to look at StreamTokenizer or something else to pull words out one at a time.  Once the corpus is read in, the words are split on whitespace and other non-alphanumeric characters, then turned to lowercase to get a more accurate word count (versus capitalization differences, e.g.). The word count map is then updated, then a string output is generated with inject(). This is another place where the code can be more efficient (using map.each() or something), but I was trying to keep the body of the session.write() closure concise.  The string output is written to the OutputStream, then after the write() has completed, the filename attribute is set and the file is sent to "success".

The example code for the ExecuteScript processor is as follows:

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   def wordCount = [:]

   def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
   def words = tellTaleHeart.split(/(!|\?|-|\.|\"|:|;|,|\s)+/)*.toLowerCase()

   words.each { word ->
   def currentWordCount = wordCount.get(word)
   if(!currentWordCount) {
          wordCount.put(word, 1)
   }
   else {
          wordCount.put(word, currentWordCount + 1)
   }
   }

   def outputMapString = wordCount.inject("", {k,v -> k += "${v.key}: ${v.value}\n"})
  
   outputStream.write(outputMapString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

flowFile = session.putAttribute(flowFile, 'filename', 'telltale_heart_wordcount')
session.transfer(flowFile, REL_SUCCESS)

The self-contained template is a Gist (here), it includes the full text and a PutFile to write out the word count file in a directory relative to the NiFi instance.

ExecuteScript processor - Hello World!

In Apache NiFi 0.5.0, a few new processors were added, two of which allow the user to write scripts to do custom processing.  This post talks about one of them: ExecuteScript.


The ExecuteScript processor is intended as a scriptable "onTrigger" method, basically meaning when the processor is scheduled to run, your script will be executed. As of 0.5.0, the available script engines are ECMAScript (Javascript), Jython, JRuby, Groovy, and Lua). For this blog, almost all examples will be in Groovy, but templates exist for other languages as well.

To allow for the most flexibility, only a handful of objects are passed into the script as variables:

session: This is a reference to the ProcessSession assigned to the processor. The session allows you to perform operations on flow files such as create(), putAttribute(), and transfer(). We'll get to an example below.

context: This is a reference to the ProcessContext for the processor. It can be used to retrieve processor properties, relationships, and the StateManager (see NiFi docs for the uses of StateManager, also new in 0.5.0)

log: This is a reference to the ProcessorLog for the processor. Use it to log messages to NiFi, such as log.info('Hello world!')

REL_SUCCESS: This is a reference to the "success" relationship defined for the processor. It could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.

REL_FAILURE: This is a reference to the "failure" relationship defined for the processor. As with REL_SUCCESS, it could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.

Dynamic Properties:  Any dynamic properties defined in ExecuteScript are passed to the script engine as variables set to the string value of the property values. This means you must be aware of the variable naming properties for the chosen script engine. For example, Groovy does not allow periods (.) in variable names, so don't use something like "my.property" as a dynamic property name.

Usage:

The script is not required to work with the session, context, flow files, or anything else. In fact a one-line Groovy script to simply log that you're being run is:

log.info("Hello from Groovy!")

However such scripts are probably not that interesting :) Most scripts will want to interact with the session and flow files in some way, either by adding attributes, replacing content, or even creating new flow files.

You may have noticed that any incoming flow file is not passed into the script. This is because ExecuteScript can be used without any input, usually to generate flow files to pass into the remainder of the flow. To allow both cases, the ProcessSession is supplied and the script is responsible for handling any flow files. This can result in some boilerplate code, but the trade-off for flexibility and power is well worth it.

If your script only wants to handle incoming flow files, then you can simply return if the session has no flow file available for processing. In Groovy:

def flowFile = session.get()
if (!flowFile) return
// Remainder of script

If you are acting on a flow file, there are two major things to remember:

1) Keep track of the latest version of the flow file reference. This means if you act on a flow file, such as adding an attribute, you should replace the old reference with the one returned by the session method. For example:

flowFile = session.putAttribute(flowFile, 'my-property', 'my-value')

2) The script must transfer the flow file(s) that are retrieved and/or created. Unless an error condition occurred, transfer like so:

session.transfer(flowFile, REL_SUCCESS)

If an error has occurred (i.e. your script has caught an exception), you can route the flow file to failure:

session.transfer(flowFile, REL_FAILURE)

Putting this all together, here is an example script that updates the "filename" attribute (a core flow file attribute that exists on every flow file):

def flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'filename', 'myfile')
session.transfer(flowFile, REL_SUCCESS)

I have created a standalone NiFi template (ExecuteScriptHelloWorldGroovy) that will generate a JSON file, then call the above script to update the filename attribute, then log that attribute:


That's all for this introduction to ExecuteScript, check the NiFi docs for more information about configuring the ExecuteScript processor, and stay tuned for more blog posts about ExecuteScript and other cool NiFi things :)

Cheers!