One of the most powerful aspects of ExecuteScript is to leverage JARs that aren't already part of the scripting NAR. This means you can bring in capabilities that aren't already built-in to the various scripting languages and/or NiFi itself. To illustrate, I have a template that will use Hazelcast JARs to create a client and read in a stored "customer" Map.
To seed the map, I have a Groovy script that will store two entries into the customer map. It's available as a Gist (here), and points at my Docker container running Hazelcast. It will put the following entries into a Hazelcast map called "customer":
'mattyb149': ['name': 'Matt Burgess', 'email': 'mattyb149@gmail.com', 'blog': 'funnifi.blogspot.com']
'nifi': ['name': 'Apache NiFi', 'email': 'nifi@apache.org', 'blog': 'nifi.apache.org']
/Users/mburgess/Downloads/hazelcast-3.6/lib/hazelcast-3.6.jar,/Users/mburgess/Downloads/hazelcast-3.6/lib/hazelcast-client-3.6.jar
The Module Directory property takes a comma-separated list of files and folders pointing to external dependencies. For example, for Jython if you want to bring in extra *.py files, point at the folder that contains them. For Groovy and Javascript, you can point at a folder or files, and if a folder is designated, all JARs in that folder will be available.
The Groovy script to create a new flow file and use the Hazelcast client to read in the customer map is as follows:
import com.hazelcast.client.* import com.hazelcast.client.config.* import com.hazelcast.core.* HazelcastInstance client try { ClientConfig clientConfig = new ClientConfig(); clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass"); clientConfig.getNetworkConfig().addAddress("192.168.99.100", "192.168.99.100:32780"); client = HazelcastClient.newHazelcastClient(clientConfig) flowFile = session.create() client.getMap("customers").each {k,v -> flowFile = session.putAttribute(flowFile, "hazelcast.customers.$k" as String, v as String) } session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error("Something went wrong with Hazelcast", e) session.transfer(flowFile, REL_FAILURE) } finally { client?.shutdown() }
For the Javascript version:
var clientConfig = new com.hazelcast.client.config.ClientConfig(); clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass"); clientConfig.getNetworkConfig().addAddress("192.168.99.100", "192.168.99.100:32780"); var client = com.hazelcast.client.HazelcastClient.newHazelcastClient(clientConfig) flowFile = session.create() var map = client.getMap("customers") for each (var e in map.keySet()) { flowFile = session.putAttribute(flowFile, "hazelcast.customers."+e, map.get(e)) } session.transfer(flowFile, REL_SUCCESS)
The template is available as a Gist (here), and the output in the logs looks something like:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Wed Feb 10 19:29:34 EST 2016'
Key: 'lineageStartDate'
Value: 'Wed Feb 10 19:29:34 EST 2016'
Key: 'fileSize'
Value: '0'
FlowFile Attribute Map Content
Key: 'filename'
Value: '1234292200327808'
Key: 'hazelcast.customers.mattyb149'
Value: '[name:Matt Burgess, email:mattyb149@gmail.com, blog:funnifi.blogspot.com]'
Key: 'hazelcast.customers.nifi'
Value: '[name:Apache NiFi, email:nifi@apache.org, blog:nifi.apache.org]'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'f51ce2c2-a303-4daa-934d-c2a0639c173c'
--------------------------------------------------
Hopefully this post has successfully illustrated how to create flow files and how to use the Module Directory property to bring in external dependencies. Please let me know if you have tried this and what your results were. As always, I welcome all comments, questions, and suggestions :)
Cheers!
Hi Matt,
ReplyDeleteI am using Apache Nifi 1.0.0 on my Windows 7 box. I am using Groovy engine in the ExecuteScript processor. But the Groovy engine is not able to resolve my java dependencies even though I have specified the path to the jar file in the 'module directory' property. I tried absolute and relative paths. I used forward and backward slashes. Looking through the code for the 'GroovyScriptEngineConfigurator' class, the eval method passes in the modulePaths array but looks like it is not being used. I am not sure if I am reading this correctly. Is there a known issue in the 1.0.0 version related to this. Can you please point out in case I am missing something. The error I get is-
Caused by: org.codehaus.groovy.control.MultipleCompilationErrorsException: startup failed:
Script1068.groovy: 9: unable to resolve class com.vs.NiFiJava
@ line 9, column 1.
import com.vs.NiFiJava
^
Script1068.groovy: 12: unable to resolve class NifiJavaIntegrator
@ line 12, column 1.
new NifiJavaIntegrator().printHello()
^
2 errors
at org.codehaus.groovy.control.ErrorCollector.failIfErrors(ErrorCollector.java:310) ~[na:na]
at org.codehaus.groovy.control.CompilationUnit.applyToSourceUnits(CompilationUnit.java:946) ~[na:na]
at org.codehaus.groovy.control.CompilationUnit.doPhaseOperation(CompilationUnit.java:593) ~[na:na]
at org.codehaus.groovy.control.CompilationUnit.compile(CompilationUnit.java:542) ~[na:na]
at groovy.lang.GroovyClassLoader.doParseClass(GroovyClassLoader.java:298) ~[na:na]
at groovy.lang.GroovyClassLoader.parseClass(GroovyClassLoader.java:268) ~[na:na]
at groovy.lang.GroovyClassLoader.parseClass(GroovyClassLoader.java:254) ~[na:na]
at groovy.lang.GroovyClassLoader.parseClass(GroovyClassLoader.java:212) ~[na:na]
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.getScriptClass(GroovyScriptEngineImpl.java:374) ~[na:na]
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:151) ~[na:na]
... 14 common frames omitted
What paths did you enter into Module Directory? Are they full Windows paths (C:\path\to\file\or\folder)? Or absolute *nix paths (/path/to/file/or/folder)? Try both, possibly with back- and forward-slashes. I have confirmed that this is working on Windows in NiFi 1.0.0.
DeleteThanks for the response Matt. If it has worked for you, then I might have missed something. I will give it another try.
DeleteDid you ever get this to work? We're having a similar issue with 0.7 in Linux. Giving absolute *nix style path.
DeleteI use nifi 1.0.0
ReplyDeleteI am trying to extend GetSFTP in Groovy
Nifi does not find the class. I found that the class is in
nifi-1.0.0//work/nar/extensions/nifi-standard-nar-1.0.0.nar-unpacked/META-INF/bundled-dependencies/nifi-standard-processors-1.0.0.jar
I set this path in Module Directory atrribute. Didn't help.
I downloaded a hazelcast jar as above but nifi did not find ClientConfig
I tried JavaScript. Didn't work
Is it a bug?
Extending from existing processors is not really the intent of ExecuteScript (or InvokeScriptedProcessor). Because that processor (GetSFTP) is in a different bundle (aka NiFi ARchive aka NAR), any extension would need that NAR as a parent, and that takes a Maven build setup, etc. Also beware of referring to anything in the work/ directory of NiFi, that is where NARs get unpacked so they are not guaranteed to have a certain structure or be persistent.
DeleteThe module path is intended to be for non-NiFi third-party dependencies such as Hazelcast. For your Hazelcast issue, do you have your own local copies of the JARs above, and does your Module Directory property point to all of them (or the directory containing them)?
Thank you for your response.
DeleteOne correction of my own text: I did not try to extend GetSFTP but to instantiate and call it.
Also, I tried placing nifi-standard-processors-1.0.0.jar in a custom directory similar to your hazelcast examples. I really think there is an issue in nifi 1.0.0 because it doesn't find classes event when I just try to import them.
I tried to create a custom nar using maven and including nifi-standard-processors-1.0.0.jar but then nifi simply doesn't start.
My overall goal was to create a Processor that would internally call GetFTP but do additional stuff. So my first thought was to reuse an existing Processor instead of adding an apache library for ftp and writing code.
Hi Matt,
ReplyDeleteI am a new joiner Nifi, i want de merge many files into one file.
What i do please?
Hi,
ReplyDeletePlease whats is the name of librairies i want for this script please:
import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
filename = customfilename.evaluateAttributeExpressions(flowFile).value
f = new File("/Users/gkeys/DEV/staging/${filename}")
flowFile = session.write(flowFile, {inputStream, outputStream ->
try {
f.append(IOUtils.toString(inputStream, StandardCharsets.UTF_8)+'\n')
}
catch(e) {
log.error("Error during processing custom logging: ${filename}", e)
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
You shouldn't need any additional libraries, Apache Commons IO is already included. Are you getting errors in the script?
DeleteHi Matt,
ReplyDeleteThank you for you answer,
i have GETFile->updateAttribute-> executeScript
here is my script
import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
filename = customfilename.evaluateAttributeExpressions(flowFile).value
f = new File("C:/Users/koly.sall/Documents/Demo/Data/Target/${filename}")
flowFile = session.write(flowFile, {inputStream, outputStream ->
try {
f.append(IOUtils.toString(inputStream, StandardCharsets.UTF_8)+'\n')
}
catch(e) {
log.error("Error during processing custom logging: ${filename}", e)
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
But when i execut i have two empty file
You are using session.write() which expects you to write to the contents of the outgoing flow file (otherwise it will be empty). If you are trying to append flow file contents to an external file (which it appears you are), then you don't need to write to the outgoing flow file, you just need to read from the incoming one and then transfer it as-is. Try the following instead:
Deleteimport org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
filename = customfilename.evaluateAttributeExpressions(flowFile).value
f = new File("C:/Users/koly.sall/Documents/Demo/Data/Target/${filename}")
session.read(flowFile, {inputStream ->
try {
f.append(IOUtils.toString(inputStream, StandardCharsets.UTF_8)+'\n')
}
catch(e) {
log.error("Error during processing custom logging: ${filename}", e)
}
} as InputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
Thank Mark,
ReplyDeleteIt's okay, but i have to filename (test1 & test2) i want rename for to have a same name.it's possible?
Hi Matt,
ReplyDeleteHow are you?
i have execute this script in Redhat but i don't have a output file in the target directoy
import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
filename = customfilename.evaluateAttributeExpressions(flowFile).value
f = new File("${path}/${filename}")
session.read(flowFile, {inputStream ->
try {
f.append(IOUtils.toString(inputStream, StandardCharsets.UTF_8)+'\n')
}
catch(e) {
log.error("Error during processing custom logging: ${filename}", e)
}
} as InputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
Best
/usr/lib/python2.7/site-packages/textblob/ i have included this path in module directory but still it is showing error mportError: No module named textblob in at line number 6: what should i do?
ReplyDeleteHi Matt, how are you ?
ReplyDeleteA need to help, I'm use a execute script processor on apache nifi 1.8, but a use a arrays functions on java script and my scrip send a erros said me this "includes not a function".
I don't use javascript array functions on execute script ?
thanks
Good articles, Have you heard of Mr Benjamin, Email: 247officedept@gmail.com --WhatsApp Contact:+1-9893943740-- who work with funding service they grant me loan of $95,000.00 to launch my business and I have been paying them annually for two years now and I still have 2 years left although I enjoy working with them because they are genuine Loan lender who can give you any kind of loan.
ReplyDelete