Wednesday, March 2, 2016

ExecuteScript - JSON-to-JSON Revisited (with Javascript)

A question came in on the Apache NiFi users group about doing things with ExecuteScript using something other than Groovy (specifically, Javascript). I put a fairly trivial example inside a previous post, but it doesn't cover one of the most important features, overwriting flow file content.

One major difference between Groovy and Javascript here is that you will want to get a reference to Java objects using Java.type(), in order to create new objects, invoke static methods, etc. I use the following in the script below:

var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

Also, the Javascript engine has something very much like closure coercion in Groovy. If you are creating a new anonymous object from a class/interface, and that class/interface has a single method, you can provide a Javascript function (with the appropriate arguments) in the object constructor. So to create an implementation of StreamCallback (used in session.write() to overwrite the contents of a flowfile), you have:

new StreamCallback(function(inputStream, outputStream) { /* Do stuff */ })

With that in mind, I recreated the JSON-to-JSON example from my previous post, this time using Javascript as the language.  The script is as follows:

var flowFile = session.get();
if (flowFile != null) {

  var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback")
  var IOUtils = Java.type("org.apache.commons.io.IOUtils")
  var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        var obj = JSON.parse(text)
        var newObj = {
          "Range": 5,
          "Rating": obj.rating.primary.value,
          "SecondaryRatings": {}
        }
        for(var key in obj.rating){
          var attrName = key;
          var attrValue = obj.rating[key];
          if(attrName != "primary") {
            newObj.SecondaryRatings[attrName] = {"id": attrName, "Range": 5, "Value": attrValue.value};
          }
        }
        outputStream.write(JSON.stringify(newObj, null, '\t').getBytes(StandardCharsets.UTF_8)) 
    }))
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)
}

The template is available a Gist (here), please let me know how/if this works for you.  Cheers!

UPDATE: This script works with Nashorn (Java 8's Javascript engine), but doesn't work if you're using Java 7 (Rhino).  Here's an equivalent script for Java 7 / Rhino:

importClass(org.apache.commons.io.IOUtils)
importClass(java.nio.charset.StandardCharsets)
importClass(org.apache.nifi.processor.io.StreamCallback)
var flowFile = session.get();
if (flowFile != null) {

flowFile = session.write(flowFile,
        new StreamCallback() {process : function(inputStream, outputStream) {
            var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
     var obj = JSON.parse(text)
            var newObj = {
  "Range": 5,
  "Rating": obj.rating.primary.value,
  "SecondaryRatings": {}
     }
     for(var key in obj.rating){
             var attrName = key;
             var attrValue = obj.rating[key];
    if(attrName != "primary") {
     newObj.SecondaryRatings[attrName] = {"id": attrName, "Range": 5, "Value": attrValue.value};
  }
            }
    outputStream.write(new java.lang.String(JSON.stringify(newObj, null, '\t')).getBytes(StandardCharsets.UTF_8)) 
        }})
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
}


17 comments:

  1. Hello Matt,
    Thanks for all your posts which gave me a better understanding of NIFI, Mainly concerned with ExecuteScript processor. Currently, I am trying to design some flows involving json, python, influx db. Can you please help me by provide the skeleton in python for reading / writing to and from flowfile's contents from session.? As I am new to python and Groovy.
    Thanks in advance.
    BR
    Amar.

    ReplyDelete
  2. Hi Matt,
    Thanks for all of the examples. Can you share what your development environment looks like for developing these scripts? In particular, does one go about debug them.

    Thanks,
    glenn

    ReplyDelete
    Replies
    1. One approach is to write a "unit test" in the nifi-scripting-processors submodule, pointing at your script. That way you can use the testing framework to send in flow files, assert things about the output (flow files, relationships, etc.).

      I am working on a lightweight "ExecuteScript tester" which will be a JAR that will set up an imaginary flow with an ExecuteScript that contains the script you want to test. When that's done I'll add a post to this blog :) Cheers!

      Delete
  3. Hi Matt Burgess

    can possibles to get data from different different table in NiFi? If it is possible then how?


    Plz explain....

    ReplyDelete
    Replies
    1. What do you mean by "different table"? Do you mean from a database table? That is possible using many processors such as ExecuteSQL, QueryDatabaseTable, and GenerateTableFetch

      Delete
  4. I'm not sure what you mean. Can you explain about getting data from a table? If you mean from a SQL table, check out my other post: http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html

    ReplyDelete
  5. how would you do conditional transfer for session.transfer(flowFile, REL_FAILURE); in new StreamCallback(function (inputStream, outputStream) method? Or what is the best way to do it using script in this processor?

    ReplyDelete
  6. Hi Matt How can we execute procedures of sqlplus dynamically in NiFi

    ex: sqlplus akhil/akhil@test @/home/akhil/apps/test_data/test.sql ${emp_no}

    ReplyDelete
    Replies
    1. You can use ExecuteProcess or ExecuteStreamCommand to shell out to that command

      Delete
    2. Tried with ExecuteStreamCommand but there was a problem with parameters
      I added a parameter
      Command -> sqlplus
      Command Arguments -> akhil/akhil@test @/home/akhil/apps/test_data/test.sql ${emp_no}
      and there is no Upstream for ExecuteProcess

      Delete
    3. What is your argument delimiter set to? Also you may get more help if you subscribe/send a message to users at nifi dot apache dot org

      Delete
  7. Hi Matt,
    Thanks for very helpful blog post. I am trying to read CSV and JSON data and do some validation on these data and save it to Mongoose using Nifi API. I am very new to Nifi. Can we write application layer in Javascript and talk to rest api that Nifi provides?

    ReplyDelete
  8. Hi Matt,

    Thanks for this post. It works like a charm for me.
    But i've one problem, when my file size is more 50000 bytes, the IOUtils.toString() skips the data after 50000 bytes.
    Is there any limitations for IOUtils?
    If i want to process a big file, what we have to do?

    Thanks and Regards,
    Sathish.

    ReplyDelete
    Replies
    1. I use IOUtils.toString() as a convenience method for my examples, so I can work with the entire input as a string. For larger files, you should use the "inputStream" parameter from my example above (which is a java.io.InputStream). You can wrap that in a BufferedInputStream if you like, but the idea is to keep reading from the inputStream a little at a time until all the processing is complete. If your input is line-based, you can try IOUtils.lineIterator(inputStream, charset), then you can process one line at a time without reading the whole input into memory.

      Delete
  9. hi All,
    can some body give sudo code to connect 'my sql' db and fetching information from the table. if the code is in JavaScript that i am very happy, else other language also fine,(for the same task i used ExecuteSQL)

    ReplyDelete
  10. and it is working fine ,but i need same task in ExecuteScript

    ReplyDelete