The ExecuteScript processor is intended as a scriptable "onTrigger" method, basically meaning when the processor is scheduled to run, your script will be executed. As of 0.5.0, the available script engines are ECMAScript (Javascript), Jython, JRuby, Groovy, and Lua). For this blog, almost all examples will be in Groovy, but templates exist for other languages as well.
To allow for the most flexibility, only a handful of objects are passed into the script as variables:
session: This is a reference to the ProcessSession assigned to the processor. The session allows you to perform operations on flow files such as create(), putAttribute(), and transfer(). We'll get to an example below.
context: This is a reference to the ProcessContext for the processor. It can be used to retrieve processor properties, relationships, and the StateManager (see NiFi docs for the uses of StateManager, also new in 0.5.0)
log: This is a reference to the ProcessorLog for the processor. Use it to log messages to NiFi, such as log.info('Hello world!')
REL_SUCCESS: This is a reference to the "success" relationship defined for the processor. It could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.
REL_FAILURE: This is a reference to the "failure" relationship defined for the processor. As with REL_SUCCESS, it could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.
Dynamic Properties: Any dynamic properties defined in ExecuteScript are passed to the script engine as variables set to the string value of the property values. This means you must be aware of the variable naming properties for the chosen script engine. For example, Groovy does not allow periods (.) in variable names, so don't use something like "my.property" as a dynamic property name.
Usage:
The script is not required to work with the session, context, flow files, or anything else. In fact a one-line Groovy script to simply log that you're being run is:log.info("Hello from Groovy!")
However such scripts are probably not that interesting :) Most scripts will want to interact with the session and flow files in some way, either by adding attributes, replacing content, or even creating new flow files.
You may have noticed that any incoming flow file is not passed into the script. This is because ExecuteScript can be used without any input, usually to generate flow files to pass into the remainder of the flow. To allow both cases, the ProcessSession is supplied and the script is responsible for handling any flow files. This can result in some boilerplate code, but the trade-off for flexibility and power is well worth it.
If your script only wants to handle incoming flow files, then you can simply return if the session has no flow file available for processing. In Groovy:
def flowFile = session.get() if (!flowFile) return // Remainder of script
If you are acting on a flow file, there are two major things to remember:
1) Keep track of the latest version of the flow file reference. This means if you act on a flow file, such as adding an attribute, you should replace the old reference with the one returned by the session method. For example:
flowFile = session.putAttribute(flowFile, 'my-property', 'my-value')
2) The script must transfer the flow file(s) that are retrieved and/or created. Unless an error condition occurred, transfer like so:
session.transfer(flowFile, REL_SUCCESS)
If an error has occurred (i.e. your script has caught an exception), you can route the flow file to failure:
session.transfer(flowFile, REL_FAILURE)
Putting this all together, here is an example script that updates the "filename" attribute (a core flow file attribute that exists on every flow file):
def flowFile = session.get() if(!flowFile) return flowFile = session.putAttribute(flowFile, 'filename', 'myfile') session.transfer(flowFile, REL_SUCCESS)
I have created a standalone NiFi template (ExecuteScriptHelloWorldGroovy) that will generate a JSON file, then call the above script to update the filename attribute, then log that attribute:
That's all for this introduction to ExecuteScript, check the NiFi docs for more information about configuring the ExecuteScript processor, and stay tuned for more blog posts about ExecuteScript and other cool NiFi things :)
Cheers!
Hello Matt,
ReplyDeleteCould you please share all these samples in github?
Because the code needs to be in a scripting processor, I didn't see the value in putting the snippets in GitHub by themselves. However all the scripts are available as NiFi templates on my GitHub Gist: https://gist.github.com/mattyb149
DeleteHi Matt,
DeleteCould you please provide me code for parsing text file using ExecuteScript processor as I need to extract only few fields instead of all in each and every record
What format is your text file in? CSV? In any case there is an example on selecting certain fields from a bar-delimited ( | ) text file in my other post: http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html
DeleteHi matt - great stuff here....I have started to play in Nifi quite a bit....one thing I can't figure out yet - is how to get some complex field level validations going...such as finding an embedded delimiter or even control chars like a new line...(\n).....any thoughts / advice?
ReplyDeleteThanks....
Bob
Sure! What did you have in mind? Are you trying to parse CSV or another delimited format looking for embedded characters that might also be delimiters? If you can represent what you want as a regular expression, you probably don't need a scripting processor and could use the SplitText processor instead.
DeleteMatt - thanks for responding - I have been using SplitText - and some RegExp (but thats not for the faint of heart).....for delimited, I think I now have a pretty good handle on it...but its still tough with large data sets as I need to account for the splits in HDFS (and whether a newline or delim is in a record that spans splits - is a good challenge).
ReplyDeleteI am not trying to parse and normalize some XML....then some Cobol based EBCDIC files......will keep you posted...
Thanks again....
I have a case not sure if you can help me with , have 20 SFTP source that i should get 5 files from each and merge each 5 into 1 file , i ran this process using LISTSFTP , fetshsftp , mergecontent , updateattribute(to change the filename attribute) and putfile (to store the outputfile) but all of this is connecting to one server
ReplyDeletethe question :
1- how can i make it work each time for a new server
2- how can we make the LISTSFTP pull old files as well (ListSFTP will only pull files that were modified after its last run , but sometimes we get old files a bit late e, is there a way to change LISTSFTP so it doesnt reject these files , maybe by checking file names instead of the modified date
Hi Matt,
ReplyDeleteI am using lua with ExecuteScript,but i am must get the flowFile from upstream for use session:get() and then i must deal with the flowFile,in the end write the flowFile to the downstream,can you help me and give me a simple example.Thank You!!
What is the right way to assign a groovy variable value to a flow file attribute.
ReplyDeleteHere is a trivial code snippet
def sql = Sql.newInstance("jdbc:oracle:thin:@192.168.1.211:1522:mydb", "myuser", "mypassword", "oracle.jdbc.pool.OracleDataSource");
def updateStr = "update tblflowfiles set stage='GROOVY'"
def numberRowsUpdated = sql.executeUpdate(updateStr)
sql.close();
--getting errors due to these two statements below, but when passing direct single quoted string it works.
--the errors hint at No Signature of Method.
flowFile = session.putAttribute(flowFile, 'DBSQLRESULT',"${numberRowsUpdated.toString()}") ;
flowFile = session.putAttribute(flowFile, 'DBSQLRESULTSTMNT',"${updateStr.toString()}") ;
flowFile = session.putAttribute(flowFile, 'DBSQLRESULT',"${numberRowsUpdated}".toString()) ;
DeleteThanks
DeleteMatt
ReplyDeleteI have written a groovy scrpt which uploads large files into oracle's clob column. The stuff works only if the groovy script is posted into the body of the execute script processor . If I try to use the same code as a standard external script the processor fails. I am not including any directories when I try this as a external script. Do I need to introduce something more in the groovy script , to make it run as a standard script ?
It shouldn't, the classloader gets set up regardless of whether it is a script body or file that is provided, and if a file is provided it just reads the whole thing in as a String as if it were the Script Body parameter. What error(s) do you get when trying to run as an external file? Do you have a cluster of NiFi instances? If so, is that script available to all of them?
DeleteHi Matt,
ReplyDeleteCould you please guide me an example for parsing json file using ExecuteScript processor then using KafkaProducer to send data into topic. It is good if you use python script.
Thanks.
HI Matt, i want to connect mysql database and fetch data from tables , please suggest me i can write the code JAVA, as i know java. please suggect.
ReplyDeleteHI Matt, could you please provide one example to fetch data from mysql , simple query like, select * from employee
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteTeradata is basically a database. The purpose of teradatabase is for creating a table, column, row, domain, primary key etc. you can get further more information. Teradata dba Online Training
ReplyDeleteHi Matt,
ReplyDeletei want to generate the flowfile using python can you please help me for this.Example i want to read the text file and i want to the read that file using python code and put that contents in another file.