I'll use the schema as it was presented to me on the mailing list:
{ "type": "object", "required": ["name", "tags", "timestamp", "fields"], "properties": { "name": {"type": "string"}, "timestamp": {"type": "integer"}, "tags": {"type": "object", "items": {"type": "string"}}, "fields": { "type": "object"} } }This shows that the incoming flow file should contain a JSON object, that it needs to have certain fields (the "required" values), and the types of the values it may/must contain (the "properties" entries). For this script I'll hard-code this schema, but I'll talk a bit at the end about how this can be done dynamically for a better user experience.
Since the schema itself is JSON, we use org.json.JSONObject and such to read in the schema. Then we use org.everit.json.schema.SchemaLoader to load in a Schema. We can read in the flow file with session.read, passing in a closure cast to InputStreamCallback, see my previous post for details, and call schema.validate(). If the JSON is not valid, validate() will throw a ValidationExecption. If it does, I set a "valid" variable to false, then route to SUCCESS or FAILURE depending on whether the incoming flow file was validated against the schema. The original script is as follows:
import org.everit.json.schema.Schema import org.everit.json.schema.loader.SchemaLoader import org.json.JSONObject import org.json.JSONTokener flowFile = session.get() if(!flowFile) return jsonSchema = """ { "type": "object", "required": ["name", "tags", "timestamp", "fields"], "properties": { "name": {"type": "string"}, "timestamp": {"type": "integer"}, "tags": {"type": "object", "items": {"type": "string"}}, "fields": { "type": "object"} } } """ boolean valid = true session.read(flowFile, { inputStream -> jsonInput = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8) JSONObject rawSchema = new JSONObject(new JSONTokener(new ByteArrayInputStream(jsonSchema.bytes))) Schema schema = SchemaLoader.load(rawSchema) try { schema.validate(new JSONObject(jsonInput)) } catch(ve) { log.error("Doesn't adhere to schema", ve) valid = false } } as InputStreamCallback) session.transfer(flowFile, valid ? REL_SUCCESS : REL_FAILURE)
This is a pretty basic script, there are things we could do to improve the capability:
- Move the schema load out of the session.read() method, since it doesn't require the input
- Allow the user to specify the schema via a dynamic property
- Do better exception handling and error message reporting
A worthwhile improvement (that would include all of these) is to turn the script into a proper Processor and put it in an InvokeScriptedProcessor. That way you could have a custom set of relationships, properties, to make it easy for the user to configure and use.
Of course, the best solution is probably to implement it in Java and contribute it to Apache NiFi under the Jira case NIFI-1893 :)
Cheers!