Thursday, April 27, 2023

Using JSLTTransformJSON (an alternative to JoltTransformJSON)

As of Apache NiFi 1.19.0, there is a processor called JSLTTransformJSON. This is similar in function to the JoltTransformJSON processor, but it allows you to use JSLT as the Domain-Specific Language (DSL) to transform one JSON to another. This processor was added because some users found JOLT too complex to learn, as it uses a "tree-walking" pattern to change the JSON structure versus a more programmatic DSL (From the docs, JSLT is based on jq, XPath, and XQuery). For an example of using JSLT, check out the demo playground which has a ready-made example.

For this post, I'll show a couple of examples including this following one that shows the Inception example from the JOLT playground, just as a comparison between the JOLT and JSLT expressions:

Input:

{
  "rating": {
    "primary": {
      "value": 3
    },
    "quality": {
      "value": 3
    }
  }
}

JOLT Transform:

[
  {
    "operation": "shift",
    "spec": {
      "rating": {
        "primary": {
          "value": "Rating"
        },
        "*": {
          "value": "SecondaryRatings.&1.Value",
          "$": "SecondaryRatings.&1.Id"
        }
      }
    }
  },
  {
    "operation": "default",
    "spec": {
      "Range": 5,
      "SecondaryRatings": {
        "*": {
          "Range": 5
        }
      }
    }
  }
]

Equivalent JSLT Transform:

{
 "Rating" : .rating.primary.value,
 "SecondaryRatings" : 
  [for (.rating) {
    .key : { 
        "Id": .key,
        "Value" : .value.value,
        "Range": 5
     }
   }
   if (.key != "primary")
  ],
  "Range": 5
}

These both kind of "walk the structure", but in my opinion the JSLT is more succinct and more readable for at least these reasons:

  • There is no need for a "default" spec like JOLT
  • There are fewer levels of JSON objects in the specification
  • You don't have to walk the structure one level at a time because you can refer to lower levels using the dot-notation  
  • You don't have to "count the levels" to use the & notation to walk back up the tree

Learning JSLT may not be easier than JOLT but you may find it easier if you are familiar with the tools JSLT is based on. At the end of the day they are both designed to modify the structure of input JSON, and just approach the DSL in different ways. And NiFi offers the choice between them :)

As always I welcome all comments, suggestions, and questions. Cheers!



Transform JSON String Field Into Record Object in NiFi

 One question that has come up often in the NiFi community is: "I have a field in my record(s) that is a string that actually contains JSON. Can I change that field to have the JSON object as the value?" It turns out the answer is yes! It is possible via ScriptedTransformRecord, and there are a couple of cool techniques I will show in this post where you can work with any format, not just JSON.

To start, I have a flow with a GenerateFlowFile (for test input) and a ScriptedTransformRecord for which its success relationship is sent to a funnel (just for illustration):


For GenerateFlowFile, I set the content to some test input with a field "myjson" which is a string containing JSON:

[
  {
    "id": 1,
    "myjson": "{\"a\":\"hello\",\"b\": 100}"
  },
  {
    "id": 2,
    "myjson": "{\"a\":\"world\",\"b\": 200}"
  }
]


You can see the "myjson" field is a string with a JSON object having two fields "a" and "b". With ScriptedTransformRecord I will need to look for that record field by name and parse it into a JSON object, which is really easy with Groovy. This is the entire script, with further explanation below:

import org.apache.nifi.serialization.record.*
import org.apache.nifi.serialization.record.util.*
import java.nio.charset.*
import groovy.json.*

def recordMap = record.toMap()
def derivedValues = new HashMap(recordMap)
def slurper = new JsonSlurper()
recordMap.each {k,v -> 
    if('myjson'.equals(k)) {
derivedValues.put(k, slurper.parseText(v))
    }
}
def inferredSchema = DataTypeUtils.inferSchema(derivedValues, null, StandardCharsets.UTF_8)
return new MapRecord(inferredSchema, derivedValues)

The first thing the script does (after importing the necessary packages) is to get the existing record fields as a java Map. Then a copy of the map is made so we can change the value of the "myjson" field. The record map is iterated over, giving each key/value pair where the key is the record field name and the value is the record field value. I only want to change the "myjson" field, and I replace the String value with the parsed Java Object. This is possible because the maps are of type <String, Object>.

However, I don't know the new schema of the overall record because "myjson" is no longer a String but a sub-record. I used DataTypeUtils (with a null record name as it's the root record) and its inferSchema() method to get the schema of the new output record, then I return a new record (instead of the original) using that schema and the updated map. Using a JsonRecordSetWriter in ScriptedTransformRecord gives the following output for the above input FlowFile:

[ {
  "id" : 1,
  "myjson" : {
    "a" : "hello",
    "b" : 100
  }
}, {
  "id" : 2,
  "myjson" : {
    "a" : "world",
    "b" : 200
  }
} ]

The field parsing technique (JsonSlurper in this case) can be extended to other things as well, basically any string that can be converted into a Map. Groovy has an XmlSlurper so the content can be XML and not JSON for example.

The schema inference technique can be used in many cases, not just for replacing a single field. It only takes the root-level Map, name of null, and a character set (UTF-8 in this post) and produces a new schema for the outgoing record. You can even produce multiple output records for a single input record but you'll want them both to have the same schema so the entire overall FlowFile has the same schema for all records.

As always, I welcome all comments, questions, and suggestions. Cheers!




Wednesday, January 11, 2023

cURLing the secure Apache NiFi REST API

While testing a change to a NiFi REST API endpoint, I found it would be easy to test with a simple cURL command. However the default security of NiFi uses single-user authentication, meaning you need to provide a username and password to get a token, then add that token as a header to subsequent REST API calls. To make that easier, I whipped up a quick bash script to do exactly this:

#!/usr/bin/env bash
if [ $# -lt 3 ]
  then
    echo "Usage: ncurl <username> <password> <URL>"
    exit 1
fi
token=$(curl -k https://localhost:8443/nifi-api/access/token -d "username=$1&password=$2")
curl -H "Authorization: Bearer $token" -k $3

As you can see from the usage screen, "ncurl" expects the username, password, and desired URL as arguments to the script. Here is an example:

ncurl admin mypassword https://localhost:8443/nifi-api/flow/metrics/prometheus

It is possible to reuse the token (for single-user authentication the token is good for 8 hours for example), so you could use the "token" line to get a token into a global variable, then the "curl" line to call multiple URLs. This same approach works for other authentication schemes such as LDAP and Kerberos.

Thanks to Dave Handermann for talking these concepts with me, and as always I welcome all questions, comments, suggestions, and issues. Cheers!

Tuesday, December 6, 2022

Using UpdateDatabaseRecord for Schema Migration and Database Replication

 In NiFi version 1.19.0, the UpdateDatabaseTable processor was added to help with database schema migration and to make replication of database tables easier. This processor checks the schema of the incoming records against the target database. As in the following example, if the target table does not exist, the processor can either fail the FlowFile or create the table for you:


Having the processor create the table (if it doesn't already exist) allows you to take data from any source/flow and land it in a database even if that table is not already there. This removes the previous need to set up the target tables with the intended schema manually and beforehand. I will illustrate this with a simple flow:



The flow uses GenerateFlowFile to issue some simple sample data, but in practice it will be common to use QueryDatabaseTableRecord to incrementally fetch rows from a source database table. The flow then sends the FlowFiles to UpdateDatabaseTable, but any number of transformations of the data are possible in NiFi before the records are ready for the desired target table. This is the configuration of UpdateDatabaseTable for this example:


It should be noted here (and will be noted later) that the outgoing FlowFiles from the UpdateDatabaseTable processor will have a 'output.table' attribute set to the target table name. This can (and will in this example) be used later to refer to the table that was created/updated.

When the flow is started, the target table 'udt_test' does not yet exist in the database:



Taking a look at an example FlowFile, you can see the schema of the data (i.e. the JSON structure) and thus the intended target table definition:



After starting this flow and seeing at least one FlowFile progress through UpdateDatabaseTable successfully, we can now verify the desired target table does indeed exist in the target database:



And after running with the following configuration of PutDatabaseRecord:



We can see that not only has the table been created (using the aforementioned 'output.table' attribute), but the record has been inserted successfully:



The example above is meant to demonstrate that you can take data (in the form of NiFi records in a FlowFile) from any source, transform it however you like, and put it into a database table without the table having to exist beforehand. But it can do more than that! If you have an existing table and an extra field in your input data that does not have an associated column in the target table, UpdateDatabaseTable will add the missing column and start populating rows using all the available columns in the input data. Using the above example, let's say we added an extra field named 'newField' with a string value:



Then we run that FlowFile through UpdateDatabaseTable, and see that the table schema has been updated and the value for that row has been populated, with null as default values for the previously added row:


Hopefully this post has illustrated the potential of the UpdateDatabaseTable processor to make schema migration/drift and database replication techniques much easier in Apache NiFi. As always, I welcome all comments, questions, and suggestions. Cheers!

Friday, September 18, 2020

Infer NiFi Schema from File in Groovy

 The "Infer Schema" option in CSV, JSON, and XML readers in NiFi (as of 1.9.0) has made it much easier to get your record-based data into NiFi and operate on it without having to provide the schema explicitly. However sometimes the schema may need to be tweaked (to make a field required) or datatypes changed (numbers to strings, e.g.), but writing a schema from scratch can be a pain.

To help with that, I whipped up a quick Groovy script that will check a file's extension for json, csv, or xml, and call the same schema inference code that NiFi does, then writing the Avro schema to standard out. This tool can be used to infer the schema for an arbitrary file but then use the output in an explicit schema, presumably after some minor changes are made.

The Groovy script (I named it infer_schema.groovy) is as follows:

#!/usr/bin/groovy
@Grab('org.apache.nifi:nifi-record:1.12.0')
@Grab('org.apache.nifi:nifi-record-serialization-services:1.12.0')
@Grab('org.apache.nifi:nifi-avro-record-utils:1.12.0')
import org.apache.nifi.schema.inference.*
import org.codehaus.jackson.*
import org.apache.nifi.json.*
import org.apache.nifi.avro.*
import org.apache.nifi.csv.*
import org.apache.nifi.xml.inference.*
import org.apache.nifi.context.*
import org.apache.nifi.components.*
if(args?.length < 1) {
  println 'Usage: infer_schema <file>'
  System.exit(1)
}
def fileToInfer = new File(args[0])
def ext = fileToInfer.name[(fileToInfer.name.lastIndexOf('.') + 1)..-1]
RecordSourceFactory recordSourceFactory
SchemaInferenceEngine inferenceEngine
switch(ext) {
 case 'json': recordSourceFactory = {var, inStream -> new JsonRecordSource(inStream)}
              inferenceEngine = new JsonSchemaInference(new TimeValueInference(null, null, null))
              break
 case 'csv': def context = new PropertyContext() {
                 PropertyValue getProperty(PropertyDescriptor descriptor) {
                     return (['getValue': { -> 'UTF-8'}] as PropertyValue)
                 }
                Map<String,String> getAllProperties() { [:] }
            }
             recordSourceFactory = {var, inStream -> new CSVRecordSource(inStream, context, var)}
             inferenceEngine = new CSVSchemaInference(new TimeValueInference(null, null, null))
              break
 case 'xml': recordSourceFactory = {var, inStream -> new XmlRecordSource(inStream, false)}
              inferenceEngine = new XmlSchemaInference(new TimeValueInference(null, null, null))
              break
}
fileToInfer.withInputStream {inStream ->
    def recordSource = recordSourceFactory.create([:], inStream)
    println AvroTypeUtil.buildAvroSchema(inferenceEngine.inferSchema(recordSource)).toString(true)
}


If you have date/time/timestamp values, you'll likely want to provide formats to the various TimeValueInference constructors that match the expected format of your time-based values. The script can certainly use some work to make it more robust, but mostly this is an example of using NiFi libraries in Groovy to accomplish tasks outside of a NiFi instance.

As always I welcome all comments, questions, and suggestions. Cheers!

Tuesday, April 2, 2019

Record Processing with InvokeScriptedProcessor (using Groovy)


While looking into NIFI-6151, I commented that record processing can be done by scripting processor(s), but the most appropriate approach is probably to use InvokeScriptedProcessor, as you can add more complex properties (specifying Controller Services, e.g.), versus user-defined properties for ExecuteScript.

Having said that, there is a decent amount of setup code you'd need for any record-based scripted processor, and there is Best Practice around how to handle the records, namely that you process the first record before creating a RecordSetWriter, in case your custom processor code needs to update the schema that the RecordSetWriter will use. The Groovy example below is adapted from AbstractRecordProcessor, the common base class for all record processors in the standard NAR. Note the two comment sections for handling the first and rest of the incoming records, these are where you'd put your custom code to handle records. It's probably best to add a private method to your scripted processor and just call that once for the first record and then once again in the loop (that's what AbstractRecordProcessor does):


import org.apache.nifi.flowfile.attributes.CoreAttributes
import org.apache.nifi.processor.AbstractProcessor
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
import org.apache.nifi.schema.access.SchemaNotFoundException
import java.util.concurrent.atomic.AtomicInteger

class MyRecordProcessor extends AbstractProcessor {

    // Properties
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
        .name("record-reader")
        .displayName("Record Reader")
        .description("Specifies the Controller Service to use for reading incoming data")
        .identifiesControllerService(RecordReaderFactory.class)
        .required(true)
        .build()
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
        .name("record-writer")
        .displayName("Record Writer")
        .description("Specifies the Controller Service to use for writing out the records")
        .identifiesControllerService(RecordSetWriterFactory.class)
        .required(true)
        .build()

    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build()

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        def properties = [] as ArrayList
        properties.add(RECORD_READER)
        properties.add(RECORD_WRITER)
        properties
    }

   @Override
    Set<Relationship> getRelationships() {
       [REL_SUCCESS, REL_FAILURE] as Set<Relationship>
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSession session) {
        def flowFile = session.get()
        if (!flowFile) return

        def readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory)
        def writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory)
        
        final Map<String, String> attributes = new HashMap<>()
        final AtomicInteger recordCount = new AtomicInteger()
        final FlowFile original = flowFile
        final Map<String, String> originalAttributes = flowFile.attributes
        try {
            flowFile = session.write(flowFile,  { inStream, outStream ->
                    def reader = readerFactory.createRecordReader(originalAttributes, inStream, getLogger())
                     try {

                        // Get the first record and process it before we create the Record Writer. 
                        // We do this so that if the Processor updates the Record's schema, we can provide 
                        // an updated schema to the Record Writer. If there are no records, then we can
                        // simply create the Writer with the Reader's schema and begin & end the RecordSet
                        def firstRecord = reader.nextRecord()
                        if (!firstRecord) {
                            def writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
                            def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream)
                            try {
                                writer.beginRecordSet()
                                def writeResult = writer.finishRecordSet()
                                attributes['record.count'] = String.valueOf(writeResult.recordCount)
                                attributes[CoreAttributes.MIME_TYPE.key()] = writer.mimeType
                                attributes.putAll(writeResult.attributes)
                                recordCount.set(writeResult.recordCount)
                            } finally {
                                writer.close()
                            }
                            return
                        }

                        /////////////////////////////////////////
                        // TODO process first record
                        /////////////////////////////////////////

                        def writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.schema)
                        def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream)
                        try {
                            writer.beginRecordSet()
                            writer.write(firstRecord)
                            def record
                            while (record = reader.nextRecord()) {
                                //////////////////////////////////////////
                                // TODO process next record
                                //////////////////////////////////////////
                                writer.write(processed)
                            }

                            def writeResult = writer.finishRecordSet()
                            attributes.put('record.count', String.valueOf(writeResult.recordCount))
                            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.mimeType)
                            attributes.putAll(writeResult.attributes)
                            recordCount.set(writeResult.recordCount)
                        } finally {
                            writer.close()
                        }
                    } catch (final SchemaNotFoundException e) {
                        throw new ProcessException(e.localizedMessage, e)
                    } catch (final MalformedRecordException e) {
                        throw new ProcessException('Could not parse incoming data', e)
                    } finally {
                        reader.close()
                    }
                } as StreamCallback)
            
        } catch (final Exception e) {
            getLogger().error('Failed to process {}; will route to failure', [flowFile, e] as Object[])
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        flowFile = session.putAllAttributes(flowFile, attributes)
        recordCount.get() ?  session.transfer(flowFile, REL_SUCCESS) : session.remove(flowFile)
        def count = recordCount.get()
        session.adjustCounter('Records Processed', count, false)
        getLogger().info('Successfully converted {} records for {}', [count, flowFile] as Object[])
    }
}

processor = new MyRecordProcessor()

Inside the session.write() StreamCallback, we first check to see if there are any records at all, and if there aren't we just populate the standard record-based attributes (such as record.count and the MIME type of the writer) and write out a zero-record flow file.

After that, it's time to process the first record separately from the rest. This is because the reader and/or the custom processor code may change the writer's schema from what the reader's schema. This happens, for example, during schema inference, a capability of RecordReaders since NiFi 1.9.0.

Then the first record is written, and the process continues for the rest of the records. At the end, the standard record-based attributes are populated, then the flow file is updated and transferred. The script above also includes error-handling for when things go wrong.

As always, if you try this out please let me know how/if it works for you, and I welcome all comments, questions, and suggestions. Cheers!

Wednesday, August 22, 2018

Database Sequence Lookup with ScriptedLookupService

While addressing this question, I realized it would make a good example of how to use ScriptedLookupService to provide a sequence number from a database to a field in a record. This can be useful when using PutDatabaseRecord downstream, when you want some field to contain a sequence number from a database.

In my example I'm using a CSV file with a few dummy values in it, adding an "id" field containing a sequence number, and writing out the records as JSON objects:



The LookupRecord configuration is as follows, note that the processor requires the value of any user-defined property to be a RecordPath that evaluates to a non-null value. I chose "/Age" as a dummy RecordPath here, since I know the record contains a non-null Age field:



The Groovy script for ScriptedLookupService is as follows, I am connecting to a PostgreSQL database so I generated the corresponding SQL using nextval(), you can change this to the SQL appropriate for your database type:

import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.dbcp.DBCPService
import java.sql.*

class SequenceLookupService implements LookupService<String> {

final String ID = UUID.randomUUID().toString()

public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
            .name("Database Connection Pooling Service")
            .description("The Controller Service that is used to obtain connection to database")
            .required(true)
            .identifiesControllerService(DBCPService)
            .build()

Connection conn = null
Statement stmt = null
ComponentLog log = null

    @Override
    Optional<String> lookup(Map<String, String> coordinates) {
        final String key = coordinates.keySet().first()
        ResultSet rs = stmt?.executeQuery("select nextval('${key}')".toString())
        return rs.next() ? Optional.ofNullable(rs.getLong(1)) : null
    }
    
    Set<String> getRequiredKeys() {
        return java.util.Collections.emptySet();
    }
    
    @Override
    Class<?> getValueType() {
        return String
    }

    @Override
    void initialize(ControllerServiceInitializationContext context) throws InitializationException {
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) {
       null
    }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
       name.equals(DBCP_SERVICE.name) ? DBCP_SERVICE : null
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
    }

    @Override
    List<PropertyDescriptor> getPropertyDescriptors() {
        [DBCP_SERVICE] as List;
    }

    @Override
    String getIdentifier() {
       ID
    }

    def onEnabled(context) {
        def db = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService)
        conn = db.connection
        stmt = conn.createStatement()
    }

    def onDisabled() {
      conn?.close()
    }

    def setLogger(logger) {
        log = logger
    }
}

lookupService = new SequenceLookupService()

One caveat I should mention is that currently, any properties defined by the script do not get added to the ScriptedLookupService dialog until the service is enabled, I have written an improvement Jira (NIFI-5547) to fix that.

The HCC question also refers to the nextInt() feature of NiFi Expression Language, this is MUCH faster than retrieving a sequence from a database. So is the UpdateAttribute approach, to let NiFi handle the "sequence" rather than an external database. But if you do need to use an external database sequence, this script should allow you to do that.

Note in my lookup function I only grabbed the first entry in the map, this can easily be extended to do multiple lookups of multiple sequences, but note that user-defined properties are unique by name, so you can't fill in multiple fields from the same sequence.

The above template (with script) is available as a Gist here. As always, please let me know if you try this whether it works for you, and I welcome all comments, questions, and suggestions. Cheers!