Wednesday, April 20, 2016

Using Lua with ExecuteScript in NiFi

My other blog posts have covered most of the languages supported by the ExecuteScript processor in Apache NiFi, except JRuby and LuaJ.  This is because I myself am not very familiar with either language, at least enough to do something like the JSON-to-JSON transformation example.

However they deserve some love too :) So for this post I'll do a very basic example using LuaJ to do some simple operations on a flow file.

Since ExecuteScript already binds the ProcessSession object to the variable "session", it is pretty straightforward to perform operations such as creating a flow file:
flowFile = session:create()

and adding an attribute to a flow file:
flowFile = session:putAttribute(flowFile, "lua.attrib", "Hello from Lua!")

However, it is slightly more complicated to create the equivalent of an anonymous class, such as a concrete implementation of the flow file IO interfaces like InputStreamCallback, OutputStreamCallback, and StreamCallback, which are used in methods like session.read() and session.write().  Luckily these interfaces each have a single method process(), and LuaJ allows us to create a "proxy object" from a Java interface, assigning a function to a variable named process:
local writecb =
  luajava.createProxy("org.apache.nifi.processor.io.OutputStreamCallback", {
    process = function(outputStream)
      outputStream:write("This is flow file content from Lua")
    end
})

This is an example of an OutputStreamCallback implementation, but the same approach works for InputStreamCallback and StreamCallback.

So my final (albeit simple) script to create a flow file, write some content, add an attribute, then transfer to success looks like this:
local writecb
  luajava.createProxy("org.apache.nifi.processor.io.OutputStreamCallback", {
    process = function(outputStream)
      outputStream:write("This is flow file content from Lua")
    end
})
flowFile = session:create()
flowFile = session:putAttribute(flowFile, "lua.attrib", "Hello from Lua!")
flowFile = session:write(flowFile, writecb)
session:transfer(flowFile, REL_SUCCESS)

If you are using LuaJ in ExecuteScript, I'd like to hear about how and what you are doing with it :)

Cheers!

Friday, April 15, 2016

Inspecting your NiFi DistributedMapCacheServer with Groovy

The DistributedMapCacheServer in Apache NiFi is a Controller Service that allows you to store key/value pairs for use by clients (such as NiFi's DistributedMapCacheClient service), see the official documentation for more details.

To create one, go to Controller Services (on the management toolbar on the right), create a new Controller Service, and select type DistributedMapCacheServer. Once named and saved, you can click the edit button (pencil icon) and set up the hostname and port and such:


Once the properties are saved, you can click the Enable button (lightning-bolt icon) and it will be ready for use by some NiFi Processors.

The DistributedMapCacheServer is mainly used for lookups and for keeping user-defined state. The PutDistributedMapCache and FetchDistributedMapCache processors are good for the latter. Other processors make use of the server to keep track of things like which files they have already processed. DetectDuplicate, ListFile, ListFileTransfer, GetHBase, and others make use of the cache server(s) too.

So it is certainly possible to have a NiFi dataflow with FetchDistributedMapCache scheduled for some period of time (say 10 seconds) connected to a LogAttribute processor or something to list the current contents of the desired keys in the cache. For this post I wanted to show how to inspect the cache from outside NiFi for two reasons. The first is to illustrate the use case of working with the cache without using NiFi components (good for automated population or monitoring of the cache from the outside), and also to show the very straightforward protocol used to get values from the cache. Putting data in is equally as simple, perhaps I'll add a follow-on post for that.

The DistributedMapCacheServer opens a port at the configured value (see dialog above), it expects a TCP connection and then various commands serialized in specific ways.  The first task, once connected to the server, is to negotiate the communications protocol version to be used for all future client-server operations.  To do this, we need the following:

  • Client sends the string "NiFi" as bytes to the server
  • Client sends the protocol version as an integer (4-bytes)


If you are using an output stream to write these values, make sure you flush the stream after these steps, to ensure they are sent to the server, so that the server can respond. The server will respond with one of three codes:

  • RESOURCE_OK (20): The server accepted the client's proposal of protocol name/version
  • DIFFERENT_RESOURCE_VERSION (21): The server accepted the client's proposal of protocol name but not the version
  • ABORT (255): The server aborted the connection


Once we get a RESOURCE_OK, we may continue on with our communications. If instead we get DIFFERENT_RESOURCE_VERSION, then the client needs to read in an integer containing the server's preferred protocol version. If the client can proceed using this version (or another version lower than the server's preference), it should re-negotiate the version by sending the new client-preferred version as an integer (note you do not need to send the "NiFi" again, the name has already been accepted).
If the client and server cannot agree on the protocol version, the client should disconnect from the server. If some error occurs on the server and it aborts the connection, the ABORT status code will be returned, and a message error can be obtained by the client (before disconnect) by reading in a string of UTF-8 bytes.

So let's see all this working in a simple example written in Groovy. Here is the script I used:
def protocolVersion = 1
def keys = ['entry', 'filename']

s = new Socket('localhost', 4557)

s.withStreams { input, output ->
  def dos = new DataOutputStream(output)
  def dis = new DataInputStream(input)
  
  // Negotiate handshake/version
  dos.write('NiFi'.bytes)
  dos.writeInt(protocolVersion)
  dos.flush()  
 
  status = dis.read()
  while(status == 21) {
     protocolVersion = dis.readInt()
     dos.writeInt(protocolVersion)
     dos.flush()
     status = dis.read()
  }
  
  // Get entries
  keys.each {
      key = it.getBytes('UTF-8')
      dos.writeUTF('get')
      def baos = new ByteArrayOutputStream()
      baos.write(key)
      dos.writeInt(baos.size())  
      baos.writeTo(dos)
      dos.flush()
      def length = dis.readInt()
      def bytes = new byte[length]
      dis.readFully(bytes)
      println "$it = ${new String(bytes)}"
  }
  
  // Close 
  dos.writeUTF("close");
  dos.flush();
}

I have set the protocol version to 1, which at present is the only accepted version. But you can set it higher to see the protocol negotiation work.

Also I have the variable "keys" with a list of the keys to look up in the cache. There is no mechanism at present for retrieving all the keys in the cache. This is probably for simplicity and to avoid denial-of-service type stuff if there are tons and tons of keys.  For our example, it will fetch the value for each key and print out the key/value pair.

Next you can see the creation of the socket, using the same port as was configured for the server (4557). Then Groovy has some nice additions to the Socket class, to offer an InputStream and OutputStream for that socket to your closure. Since we'll be dealing with bytes, strings, and integers, I thought a DataInputStream and DataOutputStream would be easiest (also this is how the DistributedMapCacheClient works).

The next two sections perform the protocol version negotiation as described above. Then for each key we write the string "get" followed by the key name as bytes. That is the entirety of the "get" operation :)

The server responds with the length of the key's value (in bytes). We read in the length as an integer, then read in a byte array containing the value. For my case I know the keys are strings, I simply create a String from the bytes and print out the key value pair. To my knowledge the only kinds of serialized values used by the NiFi DistributedMapCacheClient are a byte array, String, and CacheValue (used exclusively by the DetectDuplicate processor).

Once we're done reading key/value pairs, we write and flush the string "close" to tell the server our transaction is complete. I did not expressly close the socket connection, that is done by withStreams() which closes the streams when the closure is finished.

That's all there is to it! This might not be a very common use case, but it was fun to learn about some of the intra-NiFi protocols, and being able to get some information out of the system using different methods :)

Cheers!


Friday, April 8, 2016

SQL in NiFi with ExecuteScript

There is a good amount of support for interacting with Relational Database Management systems (RDBMS) in Apache NiFi:

  • Database Connection Pool controller service: A shared resource for processors to get connections to an RDBMS
  • ExecuteSQL: A processor to execute SELECT queries against an RDBMS
  • PutSQL: A processor to execute statements (INSERT, UPDATE, e.g.) against an RDBMS
  • QueryDatabaseTable: A processor to perform incremental fetching from an RDBS table

I will have a blog soon describing the configuration and use of the QueryDatabaseTable processor, which was added in Apache NiFi 0.6.0.

To set up a Database Connection Pool controller service, refer to this User Guide section. I have configured mine for a local PostgreSQL instance:


NOTE: I named it 'PostgresConnectionPool', that will enter into the script config later.

Back to the title of this post :)  The scripting processors don't know about (or have a dependency on) the Database Connection Pool controller service instances or even the API (DBCPService interface, e.g.). This would often preclude our code from accessing the service to get a database connection.

However, DBCPService has a single method getConnection(), which returns a java.sql.Connection. This class is part of Java proper, and is all we really need from the service; we can talk JDBC from there.  One of the great things about Groovy is dynamic method invocation, meaning I can call a method on an object if I know the method is there, even if I don't know what Class is the type of the object.  We'll get to that shortly.

To work with a Controller Service from ExecuteScript, we need to get a reference to a ControllerServiceLookup from the process context. In Groovy, this looks like:
def lookup = context.controllerServiceLookup
Now that we have a lookup, we use it to locate the service we want. If we know the controller service we want (and it won't change, get deleted/recreated, etc.), we can get the identifier (Id below):


 However, for this example I wanted the user to be able to specify the name of the controller service (in this case PostgresConnectionPool), not the identifier. For that we need to get all controller service identifiers, then find the one whose name equals PostgresConnectionPool.

I used a dynamic property in the ExecuteScript config dialog to let the user set the name of the desired Database Connection Pool controller service:


ExecuteScript will create a variable for each dynamic property, and bind a PropertyValue object to it. This is so you can access the value of the property as the correct type (String, integer, etc.). In our case it's a String so we can use the PropertyValue.getValue() method. In Groovy it's as simple as:
def dbServiceName = databaseConnectionPoolName.value
Now that we have a ControllerServiceLookup and the name of the service we want to find, we can use the APIs to iterate over the services until we find one whose name is the one we're looking for:
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
Now we have the identifier of the service we want, and can use lookup.getControllerService(dbcpServiceId) to get a reference to the ControllerService itself.  Note we haven't referred to this service as a DBCPService, because the script (and the processor) do not have access to that class. However as I said before, we know the method (getConnection) we want to call and we have access to the return type of that method. So in Groovy you can just invoke it:
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
I used the safe-reference operator (?.) but to go further you will want to ensure conn is not null, and report an error if it is.

Now that we have a Connection object, we could use the JDBC API (java.sql.*) to issue queries, go through the ResultSet's rows, then get each column's name and value, etc. etc. However Groovy has an excellent object called groovy.sql.Sql that does all this with Groovy idioms.  For example, to issue a query 'select * from users' and iterate over the rows (with their row number), you have:
def sql = new Sql(conn)
sql.rows('select * from users').eachWithIndex { row, idx ->
    // Do stuff for each row here
}
In my case, I want to find the column names the first time, and output them as CSV headers. Then for all rows I want to comma-separate the values and output:
if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
out.write((row.values().join(',') + "\n").getBytes())
All that's left to do is to set the filename attribute to the one specified by the filename variable (see the Configure Processor dialog above), and transfer the new flow file.  The entire script looks like this:

import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
flowFile = session.create()
flowFile = session.write(flowFile, {out -> 
    def sql = new Sql(conn)
    sql.rows('select * from users').eachWithIndex { row, idx ->
        if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
        out.write((row.values().join(',') + "\n").getBytes())
    }
  } as OutputStreamCallback)
  flowFile = session.putAttribute(flowFile, 'filename', filename.value)
  session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Scripting error', e)
    session.transfer(flowFile, REL_FAILURE)
}
conn?.close()

This script probably needs a little work before you'd want to use it, to check whether the Controller Service was found, to quote any row value that has a comma in it, etc. But I tried to keep it brief to illustrate the concepts, which are the fluent NiFI API and the cool idioms of Groovy :) For my example table, the script produces a valid CSV file:


This processor is available as a template on Gist (here), as always I welcome all questions, comments, and suggestions.

Cheers!