Wednesday, February 10, 2016

ExecuteScript - Using Modules

This is the third post in a series of blogs about the ExecuteScript processor (new as of Apache NiFi 0.5.0). The first two dealt with such concepts as sessions, attributes, and replacing content in flow files.  This post is about using the "Module Directory" property to bring in additional dependencies not bundled with the ExecuteScript NiFi ARchive (NAR).  Specifically for this example we will create a Hazelcast client in both Groovy and JavaScript that will read in entries from a Map stored in Hazelcast. And to illustrate the ability to create flow files, we won't be expecting any incoming flow files, instead we'll create a new flow file and set attributes corresponding to each entry in the map.


One of the most powerful aspects of ExecuteScript is to leverage JARs that aren't already part of the scripting NAR. This means you can bring in capabilities that aren't already built-in to the various scripting languages and/or NiFi itself. To illustrate, I have a template that will use Hazelcast JARs to create a client and read in a stored "customer" Map.

To seed the map, I have a Groovy script that will store two entries into the customer map. It's available as a Gist (here), and points at my Docker container running Hazelcast.  It will put the following entries into a Hazelcast map called "customer":

'mattyb149': ['name': 'Matt Burgess', 'email': 'mattyb149@gmail.com', 'blog': 'funnifi.blogspot.com']
'nifi': ['name': 'Apache NiFi', 'email': 'nifi@apache.org', 'blog': 'nifi.apache.org']

To get the Hazelcast JARs, I downloaded and unzipped Hazelcast 3.6 (latest download here) into my Downloads folder. Then I needed to tell the ExecuteScript processors to find the appropriate JARs, so I put the following into the Module Directory for the ExecuteScript processors:

/Users/mburgess/Downloads/hazelcast-3.6/lib/hazelcast-3.6.jar,/Users/mburgess/Downloads/hazelcast-3.6/lib/hazelcast-client-3.6.jar

The Module Directory property takes a comma-separated list of files and folders pointing to external dependencies. For example, for Jython if you want to bring in extra *.py files, point at the folder that contains them. For Groovy and Javascript, you can point at a folder or files, and if a folder is designated, all JARs in that folder will be available.

The Groovy script to create a new flow file and use the Hazelcast client to read in the customer map is as follows:

import com.hazelcast.client.*
import com.hazelcast.client.config.*
import com.hazelcast.core.*

HazelcastInstance client
try {
   ClientConfig clientConfig = new ClientConfig();
   clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass");
   clientConfig.getNetworkConfig().addAddress("192.168.99.100", "192.168.99.100:32780");

   client = HazelcastClient.newHazelcastClient(clientConfig)
   flowFile = session.create()
      client.getMap("customers").each {k,v ->
      flowFile = session.putAttribute(flowFile, "hazelcast.customers.$k" as String, v as String)
   }
   session.transfer(flowFile, REL_SUCCESS)
}
catch(e) {
   log.error("Something went wrong with Hazelcast", e)
   session.transfer(flowFile, REL_FAILURE)
} finally {
   client?.shutdown()
}

For the Javascript version:

var clientConfig = new com.hazelcast.client.config.ClientConfig();
clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass");
clientConfig.getNetworkConfig().addAddress("192.168.99.100", "192.168.99.100:32780");

var client = com.hazelcast.client.HazelcastClient.newHazelcastClient(clientConfig)
flowFile = session.create()
var map = client.getMap("customers")
for each (var e in map.keySet()) {
  flowFile = session.putAttribute(flowFile, "hazelcast.customers."+e, map.get(e))
}
session.transfer(flowFile, REL_SUCCESS)

The template is available as a Gist (here), and the output in the logs looks something like:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Wed Feb 10 19:29:34 EST 2016'
Key: 'lineageStartDate'
Value: 'Wed Feb 10 19:29:34 EST 2016'
Key: 'fileSize'
Value: '0'
FlowFile Attribute Map Content
Key: 'filename'
Value: '1234292200327808'
Key: 'hazelcast.customers.mattyb149'
Value: '[name:Matt Burgess, email:mattyb149@gmail.com, blog:funnifi.blogspot.com]'
Key: 'hazelcast.customers.nifi'
Value: '[name:Apache NiFi, email:nifi@apache.org, blog:nifi.apache.org]'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'f51ce2c2-a303-4daa-934d-c2a0639c173c'
--------------------------------------------------

Hopefully this post has successfully illustrated how to create flow files and how to use the Module Directory property to bring in external dependencies.  Please let me know if you have tried this and what your results were.  As always, I welcome all comments, questions, and suggestions :)

Cheers!

17 comments:

  1. Hi Matt,

    I am using Apache Nifi 1.0.0 on my Windows 7 box. I am using Groovy engine in the ExecuteScript processor. But the Groovy engine is not able to resolve my java dependencies even though I have specified the path to the jar file in the 'module directory' property. I tried absolute and relative paths. I used forward and backward slashes. Looking through the code for the 'GroovyScriptEngineConfigurator' class, the eval method passes in the modulePaths array but looks like it is not being used. I am not sure if I am reading this correctly. Is there a known issue in the 1.0.0 version related to this. Can you please point out in case I am missing something. The error I get is-
    Caused by: org.codehaus.groovy.control.MultipleCompilationErrorsException: startup failed:
    Script1068.groovy: 9: unable to resolve class com.vs.NiFiJava
    @ line 9, column 1.
    import com.vs.NiFiJava
    ^

    Script1068.groovy: 12: unable to resolve class NifiJavaIntegrator
    @ line 12, column 1.
    new NifiJavaIntegrator().printHello()
    ^

    2 errors

    at org.codehaus.groovy.control.ErrorCollector.failIfErrors(ErrorCollector.java:310) ~[na:na]
    at org.codehaus.groovy.control.CompilationUnit.applyToSourceUnits(CompilationUnit.java:946) ~[na:na]
    at org.codehaus.groovy.control.CompilationUnit.doPhaseOperation(CompilationUnit.java:593) ~[na:na]
    at org.codehaus.groovy.control.CompilationUnit.compile(CompilationUnit.java:542) ~[na:na]
    at groovy.lang.GroovyClassLoader.doParseClass(GroovyClassLoader.java:298) ~[na:na]
    at groovy.lang.GroovyClassLoader.parseClass(GroovyClassLoader.java:268) ~[na:na]
    at groovy.lang.GroovyClassLoader.parseClass(GroovyClassLoader.java:254) ~[na:na]
    at groovy.lang.GroovyClassLoader.parseClass(GroovyClassLoader.java:212) ~[na:na]
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.getScriptClass(GroovyScriptEngineImpl.java:374) ~[na:na]
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:151) ~[na:na]
    ... 14 common frames omitted

    ReplyDelete
    Replies
    1. What paths did you enter into Module Directory? Are they full Windows paths (C:\path\to\file\or\folder)? Or absolute *nix paths (/path/to/file/or/folder)? Try both, possibly with back- and forward-slashes. I have confirmed that this is working on Windows in NiFi 1.0.0.

      Delete
    2. Thanks for the response Matt. If it has worked for you, then I might have missed something. I will give it another try.

      Delete
    3. Did you ever get this to work? We're having a similar issue with 0.7 in Linux. Giving absolute *nix style path.

      Delete
  2. I use nifi 1.0.0
    I am trying to extend GetSFTP in Groovy
    Nifi does not find the class. I found that the class is in
    nifi-1.0.0//work/nar/extensions/nifi-standard-nar-1.0.0.nar-unpacked/META-INF/bundled-dependencies/nifi-standard-processors-1.0.0.jar

    I set this path in Module Directory atrribute. Didn't help.
    I downloaded a hazelcast jar as above but nifi did not find ClientConfig
    I tried JavaScript. Didn't work
    Is it a bug?

    ReplyDelete
    Replies
    1. Extending from existing processors is not really the intent of ExecuteScript (or InvokeScriptedProcessor). Because that processor (GetSFTP) is in a different bundle (aka NiFi ARchive aka NAR), any extension would need that NAR as a parent, and that takes a Maven build setup, etc. Also beware of referring to anything in the work/ directory of NiFi, that is where NARs get unpacked so they are not guaranteed to have a certain structure or be persistent.

      The module path is intended to be for non-NiFi third-party dependencies such as Hazelcast. For your Hazelcast issue, do you have your own local copies of the JARs above, and does your Module Directory property point to all of them (or the directory containing them)?

      Delete
    2. Thank you for your response.
      One correction of my own text: I did not try to extend GetSFTP but to instantiate and call it.
      Also, I tried placing nifi-standard-processors-1.0.0.jar in a custom directory similar to your hazelcast examples. I really think there is an issue in nifi 1.0.0 because it doesn't find classes event when I just try to import them.
      I tried to create a custom nar using maven and including nifi-standard-processors-1.0.0.jar but then nifi simply doesn't start.
      My overall goal was to create a Processor that would internally call GetFTP but do additional stuff. So my first thought was to reuse an existing Processor instead of adding an apache library for ftp and writing code.

      Delete
  3. Hi Matt,
    I am a new joiner Nifi, i want de merge many files into one file.
    What i do please?

    ReplyDelete
  4. Hi,
    Please whats is the name of librairies i want for this script please:
    import org.apache.commons.io.IOUtils
    import java.nio.charset.*
    def flowFile = session.get()
    if(!flowFile) return
    filename = customfilename.evaluateAttributeExpressions(flowFile).value
    f = new File("/Users/gkeys/DEV/staging/${filename}")
    flowFile = session.write(flowFile, {inputStream, outputStream ->
    try {
    f.append(IOUtils.toString(inputStream, StandardCharsets.UTF_8)+'\n')
    }
    catch(e) {
    log.error("Error during processing custom logging: ${filename}", e)
    }
    } as StreamCallback)
    session.transfer(flowFile, REL_SUCCESS)

    ReplyDelete
    Replies
    1. You shouldn't need any additional libraries, Apache Commons IO is already included. Are you getting errors in the script?

      Delete
  5. Hi Matt,
    Thank you for you answer,
    i have GETFile->updateAttribute-> executeScript
    here is my script
    import org.apache.commons.io.IOUtils
    import java.nio.charset.*
    def flowFile = session.get()
    if(!flowFile) return
    filename = customfilename.evaluateAttributeExpressions(flowFile).value
    f = new File("C:/Users/koly.sall/Documents/Demo/Data/Target/${filename}")
    flowFile = session.write(flowFile, {inputStream, outputStream ->
    try {
    f.append(IOUtils.toString(inputStream, StandardCharsets.UTF_8)+'\n')
    }
    catch(e) {
    log.error("Error during processing custom logging: ${filename}", e)
    }
    } as StreamCallback)
    session.transfer(flowFile, REL_SUCCESS)

    But when i execut i have two empty file

    ReplyDelete
    Replies
    1. You are using session.write() which expects you to write to the contents of the outgoing flow file (otherwise it will be empty). If you are trying to append flow file contents to an external file (which it appears you are), then you don't need to write to the outgoing flow file, you just need to read from the incoming one and then transfer it as-is. Try the following instead:

      import org.apache.commons.io.IOUtils
      import java.nio.charset.*
      def flowFile = session.get()
      if(!flowFile) return
      filename = customfilename.evaluateAttributeExpressions(flowFile).value
      f = new File("C:/Users/koly.sall/Documents/Demo/Data/Target/${filename}")
      session.read(flowFile, {inputStream ->
      try {
      f.append(IOUtils.toString(inputStream, StandardCharsets.UTF_8)+'\n')
      }
      catch(e) {
      log.error("Error during processing custom logging: ${filename}", e)
      }
      } as InputStreamCallback)
      session.transfer(flowFile, REL_SUCCESS)

      Delete
  6. Thank Mark,
    It's okay, but i have to filename (test1 & test2) i want rename for to have a same name.it's possible?

    ReplyDelete
  7. Hi Matt,
    How are you?
    i have execute this script in Redhat but i don't have a output file in the target directoy
    import org.apache.commons.io.IOUtils
    import java.nio.charset.*
    def flowFile = session.get()
    if(!flowFile) return
    filename = customfilename.evaluateAttributeExpressions(flowFile).value
    f = new File("${path}/${filename}")
    session.read(flowFile, {inputStream ->
    try {
    f.append(IOUtils.toString(inputStream, StandardCharsets.UTF_8)+'\n')
    }
    catch(e) {
    log.error("Error during processing custom logging: ${filename}", e)
    }
    } as InputStreamCallback)
    session.transfer(flowFile, REL_SUCCESS)

    Best

    ReplyDelete
  8. /usr/lib/python2.7/site-packages/textblob/ i have included this path in module directory but still it is showing error mportError: No module named textblob in at line number 6: what should i do?

    ReplyDelete
  9. Hi Matt, how are you ?

    A need to help, I'm use a execute script processor on apache nifi 1.8, but a use a arrays functions on java script and my scrip send a erros said me this "includes not a function".

    I don't use javascript array functions on execute script ?

    thanks

    ReplyDelete
  10. Good articles, Have you heard of Mr Benjamin, Email: 247officedept@gmail.com --WhatsApp Contact:+1-9893943740-- who work with funding service they grant me loan of $95,000.00 to launch my business and I have been paying them annually for two years now and I still have 2 years left although I enjoy working with them because they are genuine Loan lender who can give you any kind of loan.

    ReplyDelete