Tuesday, April 2, 2019

Record Processing with InvokeScriptedProcessor (using Groovy)


While looking into NIFI-6151, I commented that record processing can be done by scripting processor(s), but the most appropriate approach is probably to use InvokeScriptedProcessor, as you can add more complex properties (specifying Controller Services, e.g.), versus user-defined properties for ExecuteScript.

Having said that, there is a decent amount of setup code you'd need for any record-based scripted processor, and there is Best Practice around how to handle the records, namely that you process the first record before creating a RecordSetWriter, in case your custom processor code needs to update the schema that the RecordSetWriter will use. The Groovy example below is adapted from AbstractRecordProcessor, the common base class for all record processors in the standard NAR. Note the two comment sections for handling the first and rest of the incoming records, these are where you'd put your custom code to handle records. It's probably best to add a private method to your scripted processor and just call that once for the first record and then once again in the loop (that's what AbstractRecordProcessor does):


import org.apache.nifi.flowfile.attributes.CoreAttributes
import org.apache.nifi.processor.AbstractProcessor
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
import org.apache.nifi.schema.access.SchemaNotFoundException
import java.util.concurrent.atomic.AtomicInteger

class MyRecordProcessor extends AbstractProcessor {

    // Properties
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
        .name("record-reader")
        .displayName("Record Reader")
        .description("Specifies the Controller Service to use for reading incoming data")
        .identifiesControllerService(RecordReaderFactory.class)
        .required(true)
        .build()
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
        .name("record-writer")
        .displayName("Record Writer")
        .description("Specifies the Controller Service to use for writing out the records")
        .identifiesControllerService(RecordSetWriterFactory.class)
        .required(true)
        .build()

    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build()

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        def properties = [] as ArrayList
        properties.add(RECORD_READER)
        properties.add(RECORD_WRITER)
        properties
    }

   @Override
    Set<Relationship> getRelationships() {
       [REL_SUCCESS, REL_FAILURE] as Set<Relationship>
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSession session) {
        def flowFile = session.get()
        if (!flowFile) return

        def readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory)
        def writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory)
        
        final Map<String, String> attributes = new HashMap<>()
        final AtomicInteger recordCount = new AtomicInteger()
        final FlowFile original = flowFile
        final Map<String, String> originalAttributes = flowFile.attributes
        try {
            flowFile = session.write(flowFile,  { inStream, outStream ->
                    def reader = readerFactory.createRecordReader(originalAttributes, inStream, getLogger())
                     try {

                        // Get the first record and process it before we create the Record Writer. 
                        // We do this so that if the Processor updates the Record's schema, we can provide 
                        // an updated schema to the Record Writer. If there are no records, then we can
                        // simply create the Writer with the Reader's schema and begin & end the RecordSet
                        def firstRecord = reader.nextRecord()
                        if (!firstRecord) {
                            def writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
                            def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream)
                            try {
                                writer.beginRecordSet()
                                def writeResult = writer.finishRecordSet()
                                attributes['record.count'] = String.valueOf(writeResult.recordCount)
                                attributes[CoreAttributes.MIME_TYPE.key()] = writer.mimeType
                                attributes.putAll(writeResult.attributes)
                                recordCount.set(writeResult.recordCount)
                            } finally {
                                writer.close()
                            }
                            return
                        }

                        /////////////////////////////////////////
                        // TODO process first record
                        /////////////////////////////////////////

                        def writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.schema)
                        def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream)
                        try {
                            writer.beginRecordSet()
                            writer.write(firstRecord)
                            def record
                            while (record = reader.nextRecord()) {
                                //////////////////////////////////////////
                                // TODO process next record
                                //////////////////////////////////////////
                                writer.write(processed)
                            }

                            def writeResult = writer.finishRecordSet()
                            attributes.put('record.count', String.valueOf(writeResult.recordCount))
                            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.mimeType)
                            attributes.putAll(writeResult.attributes)
                            recordCount.set(writeResult.recordCount)
                        } finally {
                            writer.close()
                        }
                    } catch (final SchemaNotFoundException e) {
                        throw new ProcessException(e.localizedMessage, e)
                    } catch (final MalformedRecordException e) {
                        throw new ProcessException('Could not parse incoming data', e)
                    } finally {
                        reader.close()
                    }
                } as StreamCallback)
            
        } catch (final Exception e) {
            getLogger().error('Failed to process {}; will route to failure', [flowFile, e] as Object[])
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        flowFile = session.putAllAttributes(flowFile, attributes)
        recordCount.get() ?  session.transfer(flowFile, REL_SUCCESS) : session.remove(flowFile)
        def count = recordCount.get()
        session.adjustCounter('Records Processed', count, false)
        getLogger().info('Successfully converted {} records for {}', [count, flowFile] as Object[])
    }
}

processor = new MyRecordProcessor()

Inside the session.write() StreamCallback, we first check to see if there are any records at all, and if there aren't we just populate the standard record-based attributes (such as record.count and the MIME type of the writer) and write out a zero-record flow file.

After that, it's time to process the first record separately from the rest. This is because the reader and/or the custom processor code may change the writer's schema from what the reader's schema. This happens, for example, during schema inference, a capability of RecordReaders since NiFi 1.9.0.

Then the first record is written, and the process continues for the rest of the records. At the end, the standard record-based attributes are populated, then the flow file is updated and transferred. The script above also includes error-handling for when things go wrong.

As always, if you try this out please let me know how/if it works for you, and I welcome all comments, questions, and suggestions. Cheers!

7 comments:

  1. Hi I am getting deadlock error from sql server. I am processing large file by dividing it into batches and than into individual rows as a flow file. I insert original data into a table called Record table and than later on update it in another groovy script . Both the scripts codes are here. Can u suggest the correct way to fix it?


    import org.apache.nifi.controller.ControllerService
    import org.apache.commons.io.IOUtils
    import java.nio.charset.StandardCharsets
    import groovy.json.JsonSlurper
    import groovy.sql.Sql
    import java.text.SimpleDateFormat

    def flowFile = session.get()

    if(!flowFile) return

    def conn

    try {
    def lookup = context.controllerServiceLookup
    def dbServiceName = databaseConnectionPoolName.value
    def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
    }
    conn = lookup.getControllerService(dbcpServiceId)?.getConnection()

    def jsonSlurper = new JsonSlurper()

    def batchId = flowFile.getAttribute('batchId')
    def recordCreateddateTime = new Date()
    def rowNumber = flowFile.getAttribute('fragment.index')
    if(batchId){

    def statusId = 1
    def processMessage = ''

    def sql = new Sql(conn)

    sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row ->
    if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){
    statusId= row.RecordStatusId
    statusId= statusId.toInteger()
    processMessage = row.StatusDescription
    }
    }
    def generatedKeys = ''
    def row = session.read(flowFile).getText("UTF-8")
    generatedKeys = sql.executeInsert("INSERT INTO DataRecord(BatchId, RowNumber,ProcessMessage,RecordStatusId,CreatedBy,CreatedDate,OriginalData) values (${batchId}, ${rowNumber}, ${processMessage},${statusId},'NIFI',${recordCreateddateTime},${row})")
    flowFile = session.putAttribute(flowFile, 'recordId', generatedKeys[0][0].toString())
    session.transfer(flowFile,REL_SUCCESS)
    }
    conn?.close()
    }
    catch(e) {
    conn?.close()
    log.error('Scripting error', e)
    flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())
    flowFile = session.putAttribute(flowFile, 'execution.status', 'error')
    flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())
    session.transfer(flowFile, REL_FAILURE)
    }

    ReplyDelete
  2. import org.apache.nifi.controller.ControllerService
    import org.apache.commons.io.IOUtils
    import java.nio.charset.StandardCharsets
    import groovy.json.JsonSlurper
    import groovy.sql.Sql
    import java.text.SimpleDateFormat

    def flowFile = session.get()

    if(!flowFile) return

    def conn

    try {
    def lookup = context.controllerServiceLookup
    def dbServiceName = databaseConnectionPoolName.value

    def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
    }
    conn = lookup.getControllerService(dbcpServiceId)?.getConnection()

    def jsonSlurper = new JsonSlurper()

    def recordId = flowFile.getAttribute('recordId')
    def errorStatus = flowFile.getAttribute('execution.status')
    def errorMessage = flowFile.getAttribute('execution.message')
    def errors = flowFile.getAttribute('errors')
    def recordCreateddateTime = new Date()
    if(recordId){

    def statusId = 1
    def processMessage = ''

    def sql = new Sql(conn)

    sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row ->
    if((errorStatus && errorStatus == 'cannotParseJson' )|| errors){
    if(row.StatusDescription.toUpperCase() == 'Errored'.toUpperCase()){
    statusId= row.RecordStatusId
    statusId= statusId.toInteger()
    processMessage = errorMessage
    }
    }else if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){
    statusId= row.RecordStatusId
    statusId= statusId.toInteger()
    processMessage = row.StatusDescription
    }
    }

    def row = session.read(flowFile).getText("UTF-8")
    sql.executeInsert("UPDATE DataRecord SET JsonData = ${row}, UpdatedBy = 'NIFI', UpdatedDate = ${recordCreateddateTime} , ProcessMessage=${processMessage}, RecordStatusId = ${statusId} WHERE DataRecordId = ${recordId} ")
    flowFile = session.putAttribute(flowFile, 'success', 'updatedSuccessfully')
    session.transfer(flowFile,REL_SUCCESS)
    }
    conn?.close()
    }
    catch(e) {
    conn?.close()
    log.error('Scripting error', e)
    flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())
    flowFile = session.putAttribute(flowFile, 'execution.status', 'error')
    flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())
    session.transfer(flowFile, REL_FAILURE)
    }

    ReplyDelete
  3. When I put the script in ISP it is not validated. I get: InvokeScriptedProcessor[id=3122a12f-016b-1000-e1bb-7e5ea29bd01e] Unable to validate the script Processor: java.lang.RuntimeException: java.lang.NullPointerException: java.lang.RuntimeException: java.lang.NullPointerException

    Should I set Module Directory? What could be a problem. I am using NiFi 1.8.0 running in docker container.

    ReplyDelete

  4. I was scrolling the internet like every day, there I found this article which is related to my interest. The way you covered the knowledge about the subject and the Bungalows in chunabhatti bhopal was worth to read, it undoubtedly cleared my vision and thoughts towards B Commercial Shops on ayodhya bypass road. Your writing skills and the way you portrayed the examples are very impressive. The knowledge about flats in chunabhatti bhopal is well covered. Thank you for putting this highly informative article on the internet which is clearing the vision about top builders in Bhopal and who are making an impact in the real estate sector by building such amazing townships.

    ReplyDelete
  5. I recommend only good and reliable information, so see it: record web session

    ReplyDelete
  6. Excellent read, Positive site, where did u come up with the information on this posting?I have read a few of the articles on your website now, and I really like your style. Thanks a million and please keep up the effective work. https://www.pageview.com/

    ReplyDelete