Wednesday, February 10, 2016

ExecuteScript processor - Hello World!

In Apache NiFi 0.5.0, a few new processors were added, two of which allow the user to write scripts to do custom processing.  This post talks about one of them: ExecuteScript.


The ExecuteScript processor is intended as a scriptable "onTrigger" method, basically meaning when the processor is scheduled to run, your script will be executed. As of 0.5.0, the available script engines are ECMAScript (Javascript), Jython, JRuby, Groovy, and Lua). For this blog, almost all examples will be in Groovy, but templates exist for other languages as well.

To allow for the most flexibility, only a handful of objects are passed into the script as variables:

session: This is a reference to the ProcessSession assigned to the processor. The session allows you to perform operations on flow files such as create(), putAttribute(), and transfer(). We'll get to an example below.

context: This is a reference to the ProcessContext for the processor. It can be used to retrieve processor properties, relationships, and the StateManager (see NiFi docs for the uses of StateManager, also new in 0.5.0)

log: This is a reference to the ProcessorLog for the processor. Use it to log messages to NiFi, such as log.info('Hello world!')

REL_SUCCESS: This is a reference to the "success" relationship defined for the processor. It could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.

REL_FAILURE: This is a reference to the "failure" relationship defined for the processor. As with REL_SUCCESS, it could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.

Dynamic Properties:  Any dynamic properties defined in ExecuteScript are passed to the script engine as variables set to the string value of the property values. This means you must be aware of the variable naming properties for the chosen script engine. For example, Groovy does not allow periods (.) in variable names, so don't use something like "my.property" as a dynamic property name.

Usage:

The script is not required to work with the session, context, flow files, or anything else. In fact a one-line Groovy script to simply log that you're being run is:

log.info("Hello from Groovy!")

However such scripts are probably not that interesting :) Most scripts will want to interact with the session and flow files in some way, either by adding attributes, replacing content, or even creating new flow files.

You may have noticed that any incoming flow file is not passed into the script. This is because ExecuteScript can be used without any input, usually to generate flow files to pass into the remainder of the flow. To allow both cases, the ProcessSession is supplied and the script is responsible for handling any flow files. This can result in some boilerplate code, but the trade-off for flexibility and power is well worth it.

If your script only wants to handle incoming flow files, then you can simply return if the session has no flow file available for processing. In Groovy:

def flowFile = session.get()
if (!flowFile) return
// Remainder of script

If you are acting on a flow file, there are two major things to remember:

1) Keep track of the latest version of the flow file reference. This means if you act on a flow file, such as adding an attribute, you should replace the old reference with the one returned by the session method. For example:

flowFile = session.putAttribute(flowFile, 'my-property', 'my-value')

2) The script must transfer the flow file(s) that are retrieved and/or created. Unless an error condition occurred, transfer like so:

session.transfer(flowFile, REL_SUCCESS)

If an error has occurred (i.e. your script has caught an exception), you can route the flow file to failure:

session.transfer(flowFile, REL_FAILURE)

Putting this all together, here is an example script that updates the "filename" attribute (a core flow file attribute that exists on every flow file):

def flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'filename', 'myfile')
session.transfer(flowFile, REL_SUCCESS)

I have created a standalone NiFi template (ExecuteScriptHelloWorldGroovy) that will generate a JSON file, then call the above script to update the filename attribute, then log that attribute:


That's all for this introduction to ExecuteScript, check the NiFi docs for more information about configuring the ExecuteScript processor, and stay tuned for more blog posts about ExecuteScript and other cool NiFi things :)

Cheers!


20 comments:

  1. Hello Matt,
    Could you please share all these samples in github?

    ReplyDelete
    Replies
    1. Because the code needs to be in a scripting processor, I didn't see the value in putting the snippets in GitHub by themselves. However all the scripts are available as NiFi templates on my GitHub Gist: https://gist.github.com/mattyb149

      Delete
    2. Hi Matt,

      Could you please provide me code for parsing text file using ExecuteScript processor as I need to extract only few fields instead of all in each and every record

      Delete
    3. What format is your text file in? CSV? In any case there is an example on selecting certain fields from a bar-delimited ( | ) text file in my other post: http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html

      Delete
  2. Hi matt - great stuff here....I have started to play in Nifi quite a bit....one thing I can't figure out yet - is how to get some complex field level validations going...such as finding an embedded delimiter or even control chars like a new line...(\n).....any thoughts / advice?

    Thanks....

    Bob

    ReplyDelete
    Replies
    1. Sure! What did you have in mind? Are you trying to parse CSV or another delimited format looking for embedded characters that might also be delimiters? If you can represent what you want as a regular expression, you probably don't need a scripting processor and could use the SplitText processor instead.

      Delete
  3. Matt - thanks for responding - I have been using SplitText - and some RegExp (but thats not for the faint of heart).....for delimited, I think I now have a pretty good handle on it...but its still tough with large data sets as I need to account for the splits in HDFS (and whether a newline or delim is in a record that spans splits - is a good challenge).

    I am not trying to parse and normalize some XML....then some Cobol based EBCDIC files......will keep you posted...

    Thanks again....

    ReplyDelete
  4. I have a case not sure if you can help me with , have 20 SFTP source that i should get 5 files from each and merge each 5 into 1 file , i ran this process using LISTSFTP , fetshsftp , mergecontent , updateattribute(to change the filename attribute) and putfile (to store the outputfile) but all of this is connecting to one server
    the question :
    1- how can i make it work each time for a new server
    2- how can we make the LISTSFTP pull old files as well (ListSFTP will only pull files that were modified after its last run , but sometimes we get old files a bit late e, is there a way to change LISTSFTP so it doesnt reject these files , maybe by checking file names instead of the modified date

    ReplyDelete
  5. Hi Matt,
    I am using lua with ExecuteScript,but i am must get the flowFile from upstream for use session:get() and then i must deal with the flowFile,in the end write the flowFile to the downstream,can you help me and give me a simple example.Thank You!!

    ReplyDelete
  6. What is the right way to assign a groovy variable value to a flow file attribute.

    Here is a trivial code snippet
    def sql = Sql.newInstance("jdbc:oracle:thin:@192.168.1.211:1522:mydb", "myuser", "mypassword", "oracle.jdbc.pool.OracleDataSource");
    def updateStr = "update tblflowfiles set stage='GROOVY'"
    def numberRowsUpdated = sql.executeUpdate(updateStr)
    sql.close();
    --getting errors due to these two statements below, but when passing direct single quoted string it works.
    --the errors hint at No Signature of Method.
    flowFile = session.putAttribute(flowFile, 'DBSQLRESULT',"${numberRowsUpdated.toString()}") ;
    flowFile = session.putAttribute(flowFile, 'DBSQLRESULTSTMNT',"${updateStr.toString()}") ;

    ReplyDelete
    Replies
    1. flowFile = session.putAttribute(flowFile, 'DBSQLRESULT',"${numberRowsUpdated}".toString()) ;

      Delete
  7. Matt
    I have written a groovy scrpt which uploads large files into oracle's clob column. The stuff works only if the groovy script is posted into the body of the execute script processor . If I try to use the same code as a standard external script the processor fails. I am not including any directories when I try this as a external script. Do I need to introduce something more in the groovy script , to make it run as a standard script ?

    ReplyDelete
    Replies
    1. It shouldn't, the classloader gets set up regardless of whether it is a script body or file that is provided, and if a file is provided it just reads the whole thing in as a String as if it were the Script Body parameter. What error(s) do you get when trying to run as an external file? Do you have a cluster of NiFi instances? If so, is that script available to all of them?

      Delete
  8. Hi Matt,
    Could you please guide me an example for parsing json file using ExecuteScript processor then using KafkaProducer to send data into topic. It is good if you use python script.
    Thanks.

    ReplyDelete
  9. HI Matt, i want to connect mysql database and fetch data from tables , please suggest me i can write the code JAVA, as i know java. please suggect.

    ReplyDelete
  10. HI Matt, could you please provide one example to fetch data from mysql , simple query like, select * from employee

    ReplyDelete
  11. This comment has been removed by the author.

    ReplyDelete
  12. Teradata is basically a database. The purpose of teradatabase is for creating a table, column, row, domain, primary key etc. you can get further more information. Teradata dba Online Training

    ReplyDelete
  13. Hi Matt,
    i want to generate the flowfile using python can you please help me for this.Example i want to read the text file and i want to the read that file using python code and put that contents in another file.

    ReplyDelete