While looking into NIFI-6151, I commented that record processing can be done by scripting processor(s), but the most appropriate approach is probably to use InvokeScriptedProcessor, as you can add more complex properties (specifying Controller Services, e.g.), versus user-defined properties for ExecuteScript.
Having said that, there is a decent amount of setup code you'd need for any record-based scripted processor, and there is Best Practice around how to handle the records, namely that you process the first record before creating a RecordSetWriter, in case your custom processor code needs to update the schema that the RecordSetWriter will use. The Groovy example below is adapted from AbstractRecordProcessor, the common base class for all record processors in the standard NAR. Note the two comment sections for handling the first and rest of the incoming records, these are where you'd put your custom code to handle records. It's probably best to add a private method to your scripted processor and just call that once for the first record and then once again in the loop (that's what AbstractRecordProcessor does):
import org.apache.nifi.flowfile.attributes.CoreAttributes
import org.apache.nifi.processor.AbstractProcessor
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
import org.apache.nifi.schema.access.SchemaNotFoundException
import java.util.concurrent.atomic.AtomicInteger
class MyRecordProcessor extends AbstractProcessor {
// Properties
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build()
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build()
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build()
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
def properties = [] as ArrayList
properties.add(RECORD_READER)
properties.add(RECORD_WRITER)
properties
}
@Override
Set<Relationship> getRelationships() {
[REL_SUCCESS, REL_FAILURE] as Set<Relationship>
}
@Override
void onTrigger(ProcessContext context, ProcessSession session) {
def flowFile = session.get()
if (!flowFile) return
def readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory)
def writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory)
final Map<String, String> attributes = new HashMap<>()
final AtomicInteger recordCount = new AtomicInteger()
final FlowFile original = flowFile
final Map<String, String> originalAttributes = flowFile.attributes
try {
flowFile = session.write(flowFile, { inStream, outStream ->
def reader = readerFactory.createRecordReader(originalAttributes, inStream, getLogger())
try {
// Get the first record and process it before we create the Record Writer.
// We do this so that if the Processor updates the Record's schema, we can provide
// an updated schema to the Record Writer. If there are no records, then we can
// simply create the Writer with the Reader's schema and begin & end the RecordSet
def firstRecord = reader.nextRecord()
if (!firstRecord) {
def writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream)
try {
writer.beginRecordSet()
def writeResult = writer.finishRecordSet()
attributes['record.count'] = String.valueOf(writeResult.recordCount)
attributes[CoreAttributes.MIME_TYPE.key()] = writer.mimeType
attributes.putAll(writeResult.attributes)
recordCount.set(writeResult.recordCount)
} finally {
writer.close()
}
return
}
/////////////////////////////////////////
// TODO process first record
/////////////////////////////////////////
def writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.schema)
def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream)
try {
writer.beginRecordSet()
writer.write(firstRecord)
def record
while (record = reader.nextRecord()) {
//////////////////////////////////////////
// TODO process next record
//////////////////////////////////////////
writer.write(processed)
}
def writeResult = writer.finishRecordSet()
attributes.put('record.count', String.valueOf(writeResult.recordCount))
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.mimeType)
attributes.putAll(writeResult.attributes)
recordCount.set(writeResult.recordCount)
} finally {
writer.close()
}
} catch (final SchemaNotFoundException e) {
throw new ProcessException(e.localizedMessage, e)
} catch (final MalformedRecordException e) {
throw new ProcessException('Could not parse incoming data', e)
} finally {
reader.close()
}
} as StreamCallback)
} catch (final Exception e) {
getLogger().error('Failed to process {}; will route to failure', [flowFile, e] as Object[])
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAllAttributes(flowFile, attributes)
recordCount.get() ? session.transfer(flowFile, REL_SUCCESS) : session.remove(flowFile)
def count = recordCount.get()
session.adjustCounter('Records Processed', count, false)
getLogger().info('Successfully converted {} records for {}', [count, flowFile] as Object[])
}
}
processor = new MyRecordProcessor()
After that, it's time to process the first record separately from the rest. This is because the reader and/or the custom processor code may change the writer's schema from what the reader's schema. This happens, for example, during schema inference, a capability of RecordReaders since NiFi 1.9.0.
Then the first record is written, and the process continues for the rest of the records. At the end, the standard record-based attributes are populated, then the flow file is updated and transferred. The script above also includes error-handling for when things go wrong.
As always, if you try this out please let me know how/if it works for you, and I welcome all comments, questions, and suggestions. Cheers!
Hi I am getting deadlock error from sql server. I am processing large file by dividing it into batches and than into individual rows as a flow file. I insert original data into a table called Record table and than later on update it in another groovy script . Both the scripts codes are here. Can u suggest the correct way to fix it?
ReplyDeleteimport org.apache.nifi.controller.ControllerService
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonSlurper
import groovy.sql.Sql
import java.text.SimpleDateFormat
def flowFile = session.get()
if(!flowFile) return
def conn
try {
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def jsonSlurper = new JsonSlurper()
def batchId = flowFile.getAttribute('batchId')
def recordCreateddateTime = new Date()
def rowNumber = flowFile.getAttribute('fragment.index')
if(batchId){
def statusId = 1
def processMessage = ''
def sql = new Sql(conn)
sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row ->
if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){
statusId= row.RecordStatusId
statusId= statusId.toInteger()
processMessage = row.StatusDescription
}
}
def generatedKeys = ''
def row = session.read(flowFile).getText("UTF-8")
generatedKeys = sql.executeInsert("INSERT INTO DataRecord(BatchId, RowNumber,ProcessMessage,RecordStatusId,CreatedBy,CreatedDate,OriginalData) values (${batchId}, ${rowNumber}, ${processMessage},${statusId},'NIFI',${recordCreateddateTime},${row})")
flowFile = session.putAttribute(flowFile, 'recordId', generatedKeys[0][0].toString())
session.transfer(flowFile,REL_SUCCESS)
}
conn?.close()
}
catch(e) {
conn?.close()
log.error('Scripting error', e)
flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())
flowFile = session.putAttribute(flowFile, 'execution.status', 'error')
flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())
session.transfer(flowFile, REL_FAILURE)
}
import org.apache.nifi.controller.ControllerService
ReplyDeleteimport org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonSlurper
import groovy.sql.Sql
import java.text.SimpleDateFormat
def flowFile = session.get()
if(!flowFile) return
def conn
try {
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def jsonSlurper = new JsonSlurper()
def recordId = flowFile.getAttribute('recordId')
def errorStatus = flowFile.getAttribute('execution.status')
def errorMessage = flowFile.getAttribute('execution.message')
def errors = flowFile.getAttribute('errors')
def recordCreateddateTime = new Date()
if(recordId){
def statusId = 1
def processMessage = ''
def sql = new Sql(conn)
sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row ->
if((errorStatus && errorStatus == 'cannotParseJson' )|| errors){
if(row.StatusDescription.toUpperCase() == 'Errored'.toUpperCase()){
statusId= row.RecordStatusId
statusId= statusId.toInteger()
processMessage = errorMessage
}
}else if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){
statusId= row.RecordStatusId
statusId= statusId.toInteger()
processMessage = row.StatusDescription
}
}
def row = session.read(flowFile).getText("UTF-8")
sql.executeInsert("UPDATE DataRecord SET JsonData = ${row}, UpdatedBy = 'NIFI', UpdatedDate = ${recordCreateddateTime} , ProcessMessage=${processMessage}, RecordStatusId = ${statusId} WHERE DataRecordId = ${recordId} ")
flowFile = session.putAttribute(flowFile, 'success', 'updatedSuccessfully')
session.transfer(flowFile,REL_SUCCESS)
}
conn?.close()
}
catch(e) {
conn?.close()
log.error('Scripting error', e)
flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())
flowFile = session.putAttribute(flowFile, 'execution.status', 'error')
flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())
session.transfer(flowFile, REL_FAILURE)
}
When I put the script in ISP it is not validated. I get: InvokeScriptedProcessor[id=3122a12f-016b-1000-e1bb-7e5ea29bd01e] Unable to validate the script Processor: java.lang.RuntimeException: java.lang.NullPointerException: java.lang.RuntimeException: java.lang.NullPointerException
ReplyDeleteShould I set Module Directory? What could be a problem. I am using NiFi 1.8.0 running in docker container.
ReplyDeleteI was scrolling the internet like every day, there I found this article which is related to my interest. The way you covered the knowledge about the subject and the Bungalows in chunabhatti bhopal was worth to read, it undoubtedly cleared my vision and thoughts towards B Commercial Shops on ayodhya bypass road. Your writing skills and the way you portrayed the examples are very impressive. The knowledge about flats in chunabhatti bhopal is well covered. Thank you for putting this highly informative article on the internet which is clearing the vision about top builders in Bhopal and who are making an impact in the real estate sector by building such amazing townships.
I recommend only good and reliable information, so see it: record web session
ReplyDeleteExcellent read, Positive site, where did u come up with the information on this posting?I have read a few of the articles on your website now, and I really like your style. Thanks a million and please keep up the effective work. https://www.pageview.com/
ReplyDeleteThank you for the information. devops online training
ReplyDelete