Wednesday, May 25, 2016

Validating JSON in NiFi with ExecuteScript

My last post alluded to a Groovy script for ExecuteScript that would use JSON Schema Validator to validate incoming flow files (in JSON format) against a JSON Schema. The purpose of that post was to show how to use Groovy Grape to get the JSON Schema Validator dependencies loaded dynamically by the script (versus downloading the JARs and adding them to the Module Directory property).  For this post I want to show the actual JSON validation script (as I presented it on the Apache NiFi "users" mailing list).

I'll use the schema as it was presented to me on the mailing list:
{
  "type": "object",
  "required": ["name", "tags", "timestamp", "fields"],
  "properties": {
    "name": {"type": "string"},
    "timestamp": {"type": "integer"},
    "tags": {"type": "object", "items": {"type": "string"}},
    "fields": { "type": "object"}
  }
}
This shows that the incoming flow file should contain a JSON object, that it needs to have certain fields (the "required" values), and the types of the values it may/must contain (the "properties" entries).  For this script I'll hard-code this schema, but I'll talk a bit at the end about how this can be done dynamically for a better user experience.

Since the schema itself is JSON, we use org.json.JSONObject and such to read in the schema. Then we use org.everit.json.schema.SchemaLoader to load in a Schema. We can read in the flow file with session.read, passing in a closure cast to InputStreamCallback, see my previous post for details, and call schema.validate(). If the JSON is not valid, validate() will throw a ValidationExecption. If it does, I set a "valid" variable to false, then route to SUCCESS or FAILURE depending on whether the incoming flow file was validated against the schema.  The original script is as follows:
import org.everit.json.schema.Schema
import org.everit.json.schema.loader.SchemaLoader
import org.json.JSONObject
import org.json.JSONTokener

flowFile = session.get()
if(!flowFile) return

jsonSchema = """
{
  "type": "object",
  "required": ["name", "tags", "timestamp", "fields"],
  "properties": {
    "name": {"type": "string"},
    "timestamp": {"type": "integer"},
    "tags": {"type": "object", "items": {"type": "string"}},
    "fields": { "type": "object"}
  }
}
"""

boolean valid = true
session.read(flowFile, { inputStream ->
   jsonInput = org.apache.commons.io.IOUtils.toString(inputStream,
java.nio.charset.StandardCharsets.UTF_8)
   JSONObject rawSchema = new JSONObject(new JSONTokener(new
ByteArrayInputStream(jsonSchema.bytes)))
   Schema schema = SchemaLoader.load(rawSchema)
   try {
      schema.validate(new JSONObject(jsonInput))
    } catch(ve) {
      log.error("Doesn't adhere to schema", ve)
      valid = false
    }
  } as InputStreamCallback)

session.transfer(flowFile, valid ? REL_SUCCESS : REL_FAILURE)

This is a pretty basic script, there are things we could do to improve the capability:
  • Move the schema load out of the session.read() method, since it doesn't require the input
  • Allow the user to specify the schema via a dynamic property
  • Do better exception handling and error message reporting
A worthwhile improvement (that would include all of these) is to turn the script into a proper Processor and put it in an InvokeScriptedProcessor. That way you could have a custom set of relationships, properties, to make it easy for the user to configure and use.

Of course, the best solution is probably to implement it in Java and contribute it to Apache NiFi under the Jira case NIFI-1893 :)

Cheers!

No comments:

Post a Comment