tag:blogger.com,1999:blog-42869383794183737.post6467498242394012546..comments2024-03-09T01:04:10.400-08:00Comments on Fun with Apache NiFi: Record Processing with InvokeScriptedProcessor (using Groovy)MattyBhttp://www.blogger.com/profile/05049049725324531534noreply@blogger.comBlogger7125tag:blogger.com,1999:blog-42869383794183737.post-40748441562378952512021-07-19T02:57:36.476-07:002021-07-19T02:57:36.476-07:00Thank you for the information. devops online train...Thank you for the information. <a href="https://www.igmguru.com/cloud-computing/devops-certification-training/" rel="nofollow">devops online training</a>akshat bakliwalhttps://www.blogger.com/profile/03728851189176355655noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-17363451387690441872020-01-31T08:41:52.766-08:002020-01-31T08:41:52.766-08:00Excellent read, Positive site, where did u come up...Excellent read, Positive site, where did u come up with the information on this posting?I have read a few of the articles on your website now, and I really like your style. Thanks a million and please keep up the effective work. <a href="https://www.pageview.com/" rel="nofollow">https://www.pageview.com/</a><br />haalihttps://www.blogger.com/profile/17694397367457169107noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-52712002224629863102020-01-31T07:19:15.584-08:002020-01-31T07:19:15.584-08:00I recommend only good and reliable information, so...I recommend only good and reliable information, so see it: <a href="https://www.pageview.com/" rel="nofollow">record web session</a><br />haalihttps://www.blogger.com/profile/17694397367457169107noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-62883146368781178702019-11-04T22:55:58.579-08:002019-11-04T22:55:58.579-08:00I was scrolling the internet like every day, there...<br />I was scrolling the internet like every day, there I found this article which is related to my interest. The way you covered the knowledge about the subject and the <a href="https://agrawalconstruction.com/category/5-bhk-bungalows/" rel="nofollow">Bungalows in chunabhatti bhopal</a> was worth to read, it undoubtedly cleared my vision and thoughts towards B <a href="https://agrawalconstruction.com%20%E2%80%BA%20sagar-high-street" rel="nofollow">Commercial Shops on ayodhya bypass road</a>. Your writing skills and the way you portrayed the examples are very impressive. The knowledge about <a href="https://agrawalconstruction.com%20%E2%80%BA%20sagar-green-hills" rel="nofollow">flats in chunabhatti bhopal</a> is well covered. Thank you for putting this highly informative article on the internet which is clearing the vision about top builders in Bhopal and who are making an impact in the real estate sector by building such amazing townships.<br />chetanhttps://www.blogger.com/profile/17696027129245875964noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-37572378317384209922019-06-07T02:01:59.616-07:002019-06-07T02:01:59.616-07:00When I put the script in ISP it is not validated. ...When I put the script in ISP it is not validated. I get: InvokeScriptedProcessor[id=3122a12f-016b-1000-e1bb-7e5ea29bd01e] Unable to validate the script Processor: java.lang.RuntimeException: java.lang.NullPointerException: java.lang.RuntimeException: java.lang.NullPointerException<br /><br />Should I set Module Directory? What could be a problem. I am using NiFi 1.8.0 running in docker container.Obcanhttps://www.blogger.com/profile/16118668646271502313noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-24674427636784777422019-04-10T10:42:39.162-07:002019-04-10T10:42:39.162-07:00import org.apache.nifi.controller.ControllerServic...import org.apache.nifi.controller.ControllerService<br />import org.apache.commons.io.IOUtils<br />import java.nio.charset.StandardCharsets<br />import groovy.json.JsonSlurper<br />import groovy.sql.Sql<br />import java.text.SimpleDateFormat<br /><br />def flowFile = session.get()<br /><br />if(!flowFile) return<br /><br />def conn <br /><br />try {<br />def lookup = context.controllerServiceLookup<br />def dbServiceName = databaseConnectionPoolName.value<br /><br />def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { <br />cs -> lookup.getControllerServiceName(cs) == dbServiceName<br />}<br />conn = lookup.getControllerService(dbcpServiceId)?.getConnection()<br /><br />def jsonSlurper = new JsonSlurper()<br /><br />def recordId = flowFile.getAttribute('recordId')<br />def errorStatus = flowFile.getAttribute('execution.status')<br />def errorMessage = flowFile.getAttribute('execution.message')<br />def errors = flowFile.getAttribute('errors')<br />def recordCreateddateTime = new Date()<br />if(recordId){<br /><br />def statusId = 1<br />def processMessage = ''<br /><br />def sql = new Sql(conn)<br /><br />sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row -><br />if((errorStatus && errorStatus == 'cannotParseJson' )|| errors){<br />if(row.StatusDescription.toUpperCase() == 'Errored'.toUpperCase()){<br />statusId= row.RecordStatusId<br />statusId= statusId.toInteger()<br />processMessage = errorMessage<br />}<br />}else if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){<br />statusId= row.RecordStatusId<br />statusId= statusId.toInteger()<br />processMessage = row.StatusDescription<br />}<br />} <br /><br />def row = session.read(flowFile).getText("UTF-8")<br />sql.executeInsert("UPDATE DataRecord SET JsonData = ${row}, UpdatedBy = 'NIFI', UpdatedDate = ${recordCreateddateTime} , ProcessMessage=${processMessage}, RecordStatusId = ${statusId} WHERE DataRecordId = ${recordId} ")<br />flowFile = session.putAttribute(flowFile, 'success', 'updatedSuccessfully')<br />session.transfer(flowFile,REL_SUCCESS)<br />}<br />conn?.close()<br />}<br />catch(e) {<br />conn?.close()<br />log.error('Scripting error', e)<br />flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())<br />flowFile = session.putAttribute(flowFile, 'execution.status', 'error')<br />flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())<br />session.transfer(flowFile, REL_FAILURE)<br />}Anonymoushttps://www.blogger.com/profile/17221211364154394286noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-14333294654012764382019-04-10T10:41:59.591-07:002019-04-10T10:41:59.591-07:00Hi I am getting deadlock error from sql server. I ...Hi I am getting deadlock error from sql server. I am processing large file by dividing it into batches and than into individual rows as a flow file. I insert original data into a table called Record table and than later on update it in another groovy script . Both the scripts codes are here. Can u suggest the correct way to fix it?<br /><br /><br />import org.apache.nifi.controller.ControllerService<br />import org.apache.commons.io.IOUtils<br />import java.nio.charset.StandardCharsets<br />import groovy.json.JsonSlurper<br />import groovy.sql.Sql<br />import java.text.SimpleDateFormat<br /><br />def flowFile = session.get()<br /><br />if(!flowFile) return<br /><br />def conn <br /><br />try {<br />def lookup = context.controllerServiceLookup<br />def dbServiceName = databaseConnectionPoolName.value<br />def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { <br />cs -> lookup.getControllerServiceName(cs) == dbServiceName<br />}<br />conn = lookup.getControllerService(dbcpServiceId)?.getConnection()<br /><br />def jsonSlurper = new JsonSlurper()<br /><br />def batchId = flowFile.getAttribute('batchId')<br />def recordCreateddateTime = new Date()<br />def rowNumber = flowFile.getAttribute('fragment.index')<br />if(batchId){<br /><br />def statusId = 1<br />def processMessage = ''<br /><br />def sql = new Sql(conn)<br /><br />sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row -><br />if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){<br />statusId= row.RecordStatusId<br />statusId= statusId.toInteger()<br />processMessage = row.StatusDescription<br />}<br />} <br />def generatedKeys = ''<br />def row = session.read(flowFile).getText("UTF-8")<br />generatedKeys = sql.executeInsert("INSERT INTO DataRecord(BatchId, RowNumber,ProcessMessage,RecordStatusId,CreatedBy,CreatedDate,OriginalData) values (${batchId}, ${rowNumber}, ${processMessage},${statusId},'NIFI',${recordCreateddateTime},${row})")<br />flowFile = session.putAttribute(flowFile, 'recordId', generatedKeys[0][0].toString())<br />session.transfer(flowFile,REL_SUCCESS)<br />}<br />conn?.close()<br />}<br />catch(e) {<br />conn?.close()<br />log.error('Scripting error', e)<br />flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())<br />flowFile = session.putAttribute(flowFile, 'execution.status', 'error')<br />flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())<br />session.transfer(flowFile, REL_FAILURE)<br />}Anonymoushttps://www.blogger.com/profile/17221211364154394286noreply@blogger.com