Wednesday, April 20, 2016

Using Lua with ExecuteScript in NiFi

My other blog posts have covered most of the languages supported by the ExecuteScript processor in Apache NiFi, except JRuby and LuaJ.  This is because I myself am not very familiar with either language, at least enough to do something like the JSON-to-JSON transformation example.

However they deserve some love too :) So for this post I'll do a very basic example using LuaJ to do some simple operations on a flow file.

Since ExecuteScript already binds the ProcessSession object to the variable "session", it is pretty straightforward to perform operations such as creating a flow file:
flowFile = session:create()

and adding an attribute to a flow file:
flowFile = session:putAttribute(flowFile, "lua.attrib", "Hello from Lua!")

However, it is slightly more complicated to create the equivalent of an anonymous class, such as a concrete implementation of the flow file IO interfaces like InputStreamCallback, OutputStreamCallback, and StreamCallback, which are used in methods like session.read() and session.write().  Luckily these interfaces each have a single method process(), and LuaJ allows us to create a "proxy object" from a Java interface, assigning a function to a variable named process:
local writecb =
  luajava.createProxy("org.apache.nifi.processor.io.OutputStreamCallback", {
    process = function(outputStream)
      outputStream:write("This is flow file content from Lua")
    end
})

This is an example of an OutputStreamCallback implementation, but the same approach works for InputStreamCallback and StreamCallback.

So my final (albeit simple) script to create a flow file, write some content, add an attribute, then transfer to success looks like this:
local writecb
  luajava.createProxy("org.apache.nifi.processor.io.OutputStreamCallback", {
    process = function(outputStream)
      outputStream:write("This is flow file content from Lua")
    end
})
flowFile = session:create()
flowFile = session:putAttribute(flowFile, "lua.attrib", "Hello from Lua!")
flowFile = session:write(flowFile, writecb)
session:transfer(flowFile, REL_SUCCESS)

If you are using LuaJ in ExecuteScript, I'd like to hear about how and what you are doing with it :)

Cheers!

1 comment: