Thursday, February 11, 2016

ExecuteScript - JSON-to-JSON conversion

Given the cool UI and data provenance features of NiFi, performing complex JSON-to-JSON conversion should be really simple, and once NIFI-361 is implemented it will be.  Fortunately, in the meantime, some of this can be achieved with the scripting processors (available as of Apache NiFi 0.5.0). As an example, I will present an ExecuteScript processor that performs the same conversion as the demo at http://jolt-demo.appspot.com/, with an extra metric added to really show the JSON-to-JSON transformation.

This is not at all a jab against JOLT; to the contrary, NIFI-361 will likely use JOLT as it is a powerful transformation DSL. This is more about the spirit of the ExecuteScript processor, as it is a great way to rapidly enable new features and/or data transformations.

I choose Groovy for the scripting language for ExecuteScript for two reasons: one, I am most familiar with it :) and two, you can do clean JSON-to-JSON conversions with JsonSlurper and JsonBuilder.

To emulate the jolt-demo, I start with a flow file containing the following JSON:

{
   "rating": {
      "primary": {
         "value": 3
      },
      "quality": {
         "value": 3
      },
      "metric": {
         "value": 6
      }
   }
}

Using the jolt-demo transform, the following should be output:

{
    "Range": 5,
    "Rating": "3",
    "SecondaryRatings": {
        "metric": {
            "Id": "metric",
            "Range": 5,
            "Value": 6
        },
        "quality": {
            "Id": "quality",
            "Range": 5,
            "Value": 3
        }
    }
}

Using the same rules (translated into JsonBuilder format), the script is:

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new groovy.json.JsonSlurper()

flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        def builder = new groovy.json.JsonBuilder()
        builder.call {
            'Range' 5
            'Rating' "${obj.rating.primary.value}"
            'SecondaryRatings' {
                obj.rating.findAll {it.key != "primary"}.each {k,v ->
                    "$k" {
                         'Id' "$k"
                         'Range' 5
                         'Value' v.value
                    }
                }
            }
        }
        outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)


The template is available as a Gist (here), please let me know if you have any questions, comments, or suggestions for make the scripting processors even better!

Cheers!

5 comments:

  1. hello Matt
    thanks a lot for this code/processor
    but
    i got some problem to test it in my eclipse with my own json
    --> StreamCallback Groovy:unable to resolve class StreamCallback . I downloaded groovy2.4.3
    but no trace of StreamCallback .
    i am not expert in groovy .. sorry if it is a trivial mistake ? :-)
    phil

    ReplyDelete
  2. The benefit of the scripting processors is that they run inside the NiFi environment so all the classes like StreamCallback are available. Testing from an IDE like Eclipse is not the same, you'd need to include some NiFi dependencies in your project. I will write a blog post about how to test your scripts outside a running NiFi instance. Good question!

    ReplyDelete
  3. Hello Matt,
    Very helpful forum you have here. Thanks.
    I have one NiFi environment that is HDF 2.0 and one that is back level quite a bit. I'm trying to get the similar capability of running the following through the ExecuteScript processor:

    [
    {
    "operation": "shift",
    "spec": {
    "*": {
    "@": "&"
    },
    "DOC_ID": "header.&(0,0)",
    "NOTE_DATE": "header.&(0,0)",
    "CLINICAL_NUMBER": "header.&(0,0)",
    "DOC_LINK_ID": "header.&(0,0)",
    "EVENT_TYPE_CODE": "header.&(0,0)",
    "REV_DATE": "header.&(0,0)",
    "TRANSCRIPTION_DATE": "header.&(0,0)"
    }
    },
    {
    "operation": "default",
    "spec": {
    "@class": "edu.mayo.bigdata.model.v2.HL7CnoteBean"
    }
    }
    ]

    I'm attempting this via the following script:

    import org.apache.commons.io.IOUtils
    import java.nio.charset.*

    def flowFile = session.get();
    if (flowFile == null) {
    return;
    }
    def slurper = new groovy.json.JsonSlurper()

    flowFile = session.write(flowFile,
    { inputStream, outputStream ->
    def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    def obj = slurper.parseText(text)
    def builder = new groovy.json.JsonBuilder(obj)
    builder.call {
    'operation' 'shift'
    spec {
    '*' {
    '@' '&'
    }
    'DOC_ID' 'header.&(0,0)'
    'NOTE_DATE' 'header.&(0,0)'
    'CLINICAL_NUMBER' 'header.&(0,0)'
    'DOC_LINK_ID' 'header.&(0,0)'
    'EVENT_TYPE_CODE' 'header.&(0,0)'
    'REV_DATE' 'header.&(0,0)'
    'TRANSCRIPTION_DATE' 'header.&(0,0)'
    }
    }
    {
    'operation' 'default'
    spec {
    '@class' 'edu.mayo.bigdata.model.v2.HL7CnoteBean'
    }
    }
    outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
    flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
    session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

    However, I don't appear to be handling the shift operations correctly. Could you provide some suggestions how this could be better handled?

    ReplyDelete
    Replies
    1. It appears as if you're trying to create (and perhaps apply?) a Jolt transform, where this post was me emulating such a transform using Groovy instead of Jolt. If you want to directly apply a Jolt transform, I recommend the JoltTransformJSON processor. If that is not available in your version of NiFi, ExecuteScript can be used to "fake" the Jolt transform by performing similar operations.

      Delete