tag:blogger.com,1999:blog-42869383794183737.post1046982239544921079..comments2024-03-09T01:04:10.400-08:00Comments on Fun with Apache NiFi: SQL in NiFi with ExecuteScriptMattyBhttp://www.blogger.com/profile/05049049725324531534noreply@blogger.comBlogger42125tag:blogger.com,1999:blog-42869383794183737.post-29605275156162228392020-05-30T14:29:41.789-07:002020-05-30T14:29:41.789-07:00Thank you Matt! Excellent article! This will elimi...Thank you Matt! Excellent article! This will eliminate lot of processing and resources to pull the data from database directly into csv. Appreciate all your help through nice articles.Anonymoushttps://www.blogger.com/profile/11098866951675196571noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-54982573500705448742019-12-24T03:36:34.717-08:002019-12-24T03:36:34.717-08:00I am new to Nifi, but I am experienced in Python. ...I am new to Nifi, but I am experienced in Python. Is it possible to access DBCPConnectionPool through Python script?Anonymousnoreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-23604285733556558222019-08-19T00:41:03.289-07:002019-08-19T00:41:03.289-07:00Can you share your python script? I wont to execut...Can you share your python script? I wont to execute storage procedure on MSSQL using python, but don't now how to connect to DB using jython-script(usually i use pyodbc). GarryManhttps://www.blogger.com/profile/12417546779920276037noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-68494797066626036202019-04-10T10:40:08.759-07:002019-04-10T10:40:08.759-07:00import org.apache.nifi.controller.ControllerServic...import org.apache.nifi.controller.ControllerService<br />import org.apache.commons.io.IOUtils<br />import java.nio.charset.StandardCharsets<br />import groovy.json.JsonSlurper<br />import groovy.sql.Sql<br />import java.text.SimpleDateFormat<br /><br />def flowFile = session.get()<br /><br />if(!flowFile) return<br /><br />def conn <br /><br />try {<br />def lookup = context.controllerServiceLookup<br />def dbServiceName = databaseConnectionPoolName.value<br /><br />def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { <br /> cs -> lookup.getControllerServiceName(cs) == dbServiceName<br />}<br /> conn = lookup.getControllerService(dbcpServiceId)?.getConnection()<br /><br />def jsonSlurper = new JsonSlurper()<br /><br />def recordId = flowFile.getAttribute('recordId')<br />def errorStatus = flowFile.getAttribute('execution.status')<br />def errorMessage = flowFile.getAttribute('execution.message')<br />def errors = flowFile.getAttribute('errors')<br />def recordCreateddateTime = new Date()<br />if(recordId){<br /><br />def statusId = 1<br />def processMessage = ''<br /><br />def sql = new Sql(conn)<br /><br />sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row -><br /> if((errorStatus && errorStatus == 'cannotParseJson' )|| errors){<br /> if(row.StatusDescription.toUpperCase() == 'Errored'.toUpperCase()){<br /> statusId= row.RecordStatusId<br /> statusId= statusId.toInteger()<br /> processMessage = errorMessage<br /> }<br /> }else if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){<br /> statusId= row.RecordStatusId<br /> statusId= statusId.toInteger()<br /> processMessage = row.StatusDescription<br /> }<br />} <br /><br />def row = session.read(flowFile).getText("UTF-8")<br />sql.executeInsert("UPDATE DataRecord SET JsonData = ${row}, UpdatedBy = 'NIFI', UpdatedDate = ${recordCreateddateTime} , ProcessMessage=${processMessage}, RecordStatusId = ${statusId} WHERE DataRecordId = ${recordId} ")<br />flowFile = session.putAttribute(flowFile, 'success', 'updatedSuccessfully')<br />session.transfer(flowFile,REL_SUCCESS)<br />}<br />conn?.close()<br />}<br />catch(e) {<br /> conn?.close()<br /> log.error('Scripting error', e)<br /> flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())<br /> flowFile = session.putAttribute(flowFile, 'execution.status', 'error')<br /> flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())<br /> session.transfer(flowFile, REL_FAILURE)<br />}<br />Anonymoushttps://www.blogger.com/profile/17221211364154394286noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-43755259817254472622019-04-10T10:40:02.915-07:002019-04-10T10:40:02.915-07:00Hi I am getting deadlock error from sql server. I ...Hi I am getting deadlock error from sql server. I am processing large file by dividing it into batches and than into individual rows as a flow file. I insert original data into a table called Record table and than later on update it in another groovy script . Both the scripts codes are here. Can u suggest the correct way to fix it?<br /><br /><br />import org.apache.nifi.controller.ControllerService<br />import org.apache.commons.io.IOUtils<br />import java.nio.charset.StandardCharsets<br />import groovy.json.JsonSlurper<br />import groovy.sql.Sql<br />import java.text.SimpleDateFormat<br /><br />def flowFile = session.get()<br /><br />if(!flowFile) return<br /><br />def conn <br /><br />try {<br />def lookup = context.controllerServiceLookup<br />def dbServiceName = databaseConnectionPoolName.value<br />def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { <br /> cs -> lookup.getControllerServiceName(cs) == dbServiceName<br />}<br /> conn = lookup.getControllerService(dbcpServiceId)?.getConnection()<br /><br />def jsonSlurper = new JsonSlurper()<br /><br />def batchId = flowFile.getAttribute('batchId')<br />def recordCreateddateTime = new Date()<br />def rowNumber = flowFile.getAttribute('fragment.index')<br />if(batchId){<br /><br />def statusId = 1<br />def processMessage = ''<br /><br />def sql = new Sql(conn)<br /><br />sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row -><br /> if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){<br /> statusId= row.RecordStatusId<br /> statusId= statusId.toInteger()<br /> processMessage = row.StatusDescription<br /> }<br />} <br />def generatedKeys = ''<br />def row = session.read(flowFile).getText("UTF-8")<br />generatedKeys = sql.executeInsert("INSERT INTO DataRecord(BatchId, RowNumber,ProcessMessage,RecordStatusId,CreatedBy,CreatedDate,OriginalData) values (${batchId}, ${rowNumber}, ${processMessage},${statusId},'NIFI',${recordCreateddateTime},${row})")<br />flowFile = session.putAttribute(flowFile, 'recordId', generatedKeys[0][0].toString())<br />session.transfer(flowFile,REL_SUCCESS)<br />}<br />conn?.close()<br />}<br />catch(e) {<br /> conn?.close()<br /> log.error('Scripting error', e)<br /> flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())<br /> flowFile = session.putAttribute(flowFile, 'execution.status', 'error')<br /> flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())<br /> session.transfer(flowFile, REL_FAILURE)<br />}<br />Anonymoushttps://www.blogger.com/profile/17221211364154394286noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-67516568222762259002018-10-26T04:59:54.871-07:002018-10-26T04:59:54.871-07:00Matt.. iam getting this exception.... i configured...Matt.. iam getting this exception.... i configured database connection pool processor..Caused by: javax.script.ScriptException: javax.script.ScriptException: groovy.lang.MissingPropertyException: No such property: databaseConnectionPoolName for classAnonymoushttps://www.blogger.com/profile/06167710189364226301noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-73272639724269415162018-10-26T03:03:25.089-07:002018-10-26T03:03:25.089-07:00iam new to nifi... i have to do this task below:
C...iam new to nifi... i have to do this task below:<br />Create 3 execute script processor (use Closure, Groovy, Java Script, LUA, Python or Ruby) for apache nifi (https://nifi.apache.org/)<br />These processors should use a standard controller service for mysql database connection<br />You have to create 2 Tables with identical column’s (minimum 4 and one has to be a date (YYMMDD or YYYY – MM – DD however you like)<br />You have to fill Table 1 with content ( min 40 rows)<br />Processor Number 1 has to read information from a table 1 ( minimum 4 Column’s ) from database and create a flow file objects per row (min 40) with data as content<br />Use the merge Processor to merge the content of minimum 40 flow files (which contain rows of Table 1 ) to one flow file<br />Processor Number 2 has to read the merged flow file and has to sort the content based on the date and have to create a new sorted flow file <br />Processor Number 3 has to read the content of the sorted flow file and has to write the content by using the same controller service back into table 2<br /><br />Can you suggest mattAnonymoushttps://www.blogger.com/profile/06167710189364226301noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-67867860758970268452018-10-11T02:52:33.520-07:002018-10-11T02:52:33.520-07:00Thanks for sharing this post. Your post is really ...Thanks for sharing this post. Your post is really very helpful its students. <a href="https://onlineitguru.com/python-online-training.html" rel="nofollow">python online course</a>latesttechnologyblogshttps://www.blogger.com/profile/00537914353624338231noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-15698335065022251462018-07-13T12:49:25.431-07:002018-07-13T12:49:25.431-07:00Hi Matt,
I need to execute the Stored Procedure L...Hi Matt,<br /><br />I need to execute the Stored Procedure Like Exec SP_name on sql db in sql server. Can you let me know how can i do that?Anonymoushttps://www.blogger.com/profile/00337941241174943663noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-11949051683118851832018-06-15T03:19:34.873-07:002018-06-15T03:19:34.873-07:00Hi I want to pass value to dbcp connection pool dy...Hi I want to pass value to dbcp connection pool dynamically is it possibleAnonymoushttps://www.blogger.com/profile/16266332217433934679noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-61092044560648322482018-05-07T23:59:18.778-07:002018-05-07T23:59:18.778-07:00This comment has been removed by a blog administrator.Anonymoushttps://www.blogger.com/profile/14983103606458397268noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-9573185288896627612018-03-06T01:33:18.749-08:002018-03-06T01:33:18.749-08:00Hi @Matt Burgess, I am trying as you mention in gr...Hi @Matt Burgess, I am trying as you mention in groovy code but getting error .<br /><br />import org.apache.nifi.controller.ControllerService<br />import groovy.sql.OutParameter<br />import groovy.sql.Sql<br />import oracle.jdbc.OracleTypes<br /><br />import java.sql.ResultSet<br /><br /><br />def lookup = context.controllerServiceLookup<br />def dbServiceName = ConncationPool.value<br />def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {<br /> cs -> lookup.getControllerServiceName(cs) == dbServiceName<br />}<br />def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()<br />sql = Sql.newInstance(conn);<br />OutParameter CURSOR_PARAMETER = new OutParameter() {<br /> public int getType() {<br /> return OracleTypes.CURSOR;<br /> }<br />};<br />def data = []<br />String sqlString ="""{call sp_hp_course_status_chk(?, ?, ?, ?, ?, ?)}""";<br />////Get the session values from Nifi flow<br />flowFile = session.get()<br />if(!flowFile) return<br /> in_track_id = flowFile.getAttribute('body_track_id');<br /> in_username = flowFile.getAttribute('body_username');<br /> in_course_no = flowFile.getAttribute('body_course_number');<br /><br /><br /><br />def parametersList = [in_track_id, in_username, in_course_no, CURSOR_PARAMETER, Sql.NUMERIC ,Sql.VARCHAR];<br />// rs contains the result set of cursor my_cur<br />sql.call(sqlString, parametersList) {out_details, out_status_code,out_status_desc -><br /> <br /> out_details.eachRow {<br /> data << it.toRowResult()<br /> //print (data)<br /> flowFile = session.putAttribute(flowFile, 'out_details', data);<br /> }<br /> //print (out_status_code)<br /> //print (out_status_desc)<br /> flowFile = session.putAttribute(flowFile, 'out_status_code', out_status_code);<br /> flowFile = session.putAttribute(flowFile, 'out_status_desc', out_status_desc);<br /><br />};<br /><br /> <br />In Property i set up ConncationPool as DBCPConnectionPool-Saba <br />error is <br />Caused by: javax.script.ScriptException: org.apache.nifi.processor.exception.ProcessException: org.apache.commons.dbcp.SQLNestedException: Cannot get a connection, pool error Timeout waiting for idle objectShekhhttps://www.blogger.com/profile/01433738667875024042noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-71658253780749643012018-03-01T11:57:41.116-08:002018-03-01T11:57:41.116-08:00Awesome, maybe I'll see you around! Have you h...Awesome, maybe I'll see you around! Have you heard of the Orlando Developers group? We have Meetups (https://www.meetup.com/OrlandoDevs/) and a Slack team and such, feel free to email me (my address is in any reply of mine on the NiFi mailing lists) and I can invite you to the Slack team if you like :)MattyBhttps://www.blogger.com/profile/05049049725324531534noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-65899320546487774092018-02-23T19:09:33.397-08:002018-02-23T19:09:33.397-08:00Matt, thanks again for your help and posts. I had ...Matt, thanks again for your help and posts. I had a lot of fun using NiFi and Groovy. I mentioned you and this post on my blog. Hope it is okay. I think we both live in Orlando, small world ;)<br /><br />http://boristyukin.com/how-to-connect-apache-nifi-to-apache-impala/<br />Boris Tyukinhttps://www.blogger.com/profile/15286991293900821710noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-383880651632668702018-02-01T11:31:35.962-08:002018-02-01T11:31:35.962-08:00got it, pain in the neck..I think pyspark allows u...got it, pain in the neck..I think pyspark allows using compiled python packages but I understand it is a very different architecture. Well, it is a good excuse to pick up a new languageBoris Tyukinhttps://www.blogger.com/profile/15286991293900821710noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-57604909073076468442018-02-01T09:45:21.769-08:002018-02-01T09:45:21.769-08:00The barrier to adding native Python is that NiFi r...The barrier to adding native Python is that NiFi runs on the JVM, so we'd need a package that comes with the interpreter and allows you to pip install modules there, then we'd still end up shelling out to the OS to call the interpreter anyway. So there is ExecuteStreamCommand and ExecuteProcess that can do that for your existing Python interpreter, you just need to write scripts that work with flow file contents as STDIN and your output becomes the output flow file content. It doesn't have the same NiFi API integration to work with attributes, flowfiles, etc.MattyBhttps://www.blogger.com/profile/05049049725324531534noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-48767972145369597142018-02-01T09:03:24.417-08:002018-02-01T09:03:24.417-08:00thanks Matt. Ideally, I would love to see Nifi wit...thanks Matt. Ideally, I would love to see Nifi with full support of Python native and compiled packages but since it is not possible currently, looks like Groovy is the way to go because it supports Java jars and has better performance. I do love the language, reminds me Python a lot ;)Boris Tyukinhttps://www.blogger.com/profile/15286991293900821710noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-35342961688409277842018-02-01T07:42:16.022-08:002018-02-01T07:42:16.022-08:00If you like the Groovy Way, then in NiFi 1.5.0 you...If you like the Groovy Way, then in NiFi 1.5.0 you can use ExecuteGroovyScript rather than ExecuteScript, the former gives you a lot more options using the Groovy idiom since it doesn't have to adhere to the JSR-223 spec.MattyBhttps://www.blogger.com/profile/05049049725324531534noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-65245866864409078662018-02-01T06:29:41.667-08:002018-02-01T06:29:41.667-08:00now having converted your code to Python, I see ho...now having converted your code to Python, I see how nice and neat your Groovy script is. groovy.sql.Sql is hiding lots of boilerplate code of java.sql. I saw in your other blogs that performance of Groovy scripts would be better so probably will learn some Groovy :)Boris Tyukinhttps://www.blogger.com/profile/15286991293900821710noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-7127927736662375062018-01-31T14:25:49.974-08:002018-01-31T14:25:49.974-08:00thanks for reply, Matt. I tried everything, read a...thanks for reply, Matt. I tried everything, read all your posts 10 times :) and then I was about to give up, I tried again and it worked!!<br /><br />I was saving my script to a file not the body of the processor, so maybe there was some lag between saves.<br /><br />from org.apache.nifi.controller import ControllerService<br /><br />lookup = context.getControllerServiceLookup()<br />dbServiceName = databaseConnectionPoolName.getValue()<br /><br />for cs in lookup.getControllerServiceIdentifiers(ControllerService):<br /> if lookup.getControllerServiceName(cs) == dbServiceName:<br /> dbcpServiceId = cs<br /><br />log.info(dbcpServiceId)Boris Tyukinhttps://www.blogger.com/profile/15286991293900821710noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-35794060083631444742018-01-31T14:02:20.406-08:002018-01-31T14:02:20.406-08:00You should be able to use it exactly that like tha...You should be able to use it exactly that like that (using "ControllerService" as the class/interface name). Check the Accessing Controller Services section of part 3 of my ExecuteScript Cookbook on HCC: https://community.hortonworks.com/content/kbentry/77739/executescript-cookbook-part-3.htmlMattyBhttps://www.blogger.com/profile/05049049725324531534noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-15709643185929361932018-01-31T12:12:06.507-08:002018-01-31T12:12:06.507-08:00tried to convert this to Python but cannot figure ...tried to convert this to Python but cannot figure out how to do this piece lookup.getControllerServiceIdentifiers(ControllerService). ControllerService is an interface and I am not sure how to implement it in Python. Boris Tyukinhttps://www.blogger.com/profile/15286991293900821710noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-51517972828426361712018-01-11T12:00:06.118-08:002018-01-11T12:00:06.118-08:00Matt, can the same thing be done with Python/Jytho...Matt, can the same thing be done with Python/Jython?Boris Tyukinhttps://www.blogger.com/profile/15286991293900821710noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-4362037659380631382017-10-26T07:00:52.976-07:002017-10-26T07:00:52.976-07:00Hi Matt,
Thanks for your post, I followed your in...Hi Matt, <br />Thanks for your post, I followed your instructions exactly but for some reason, flow works for few hours then I get connection problem.. it can't find the connectionPool<br /><br />can you help...<br /><br />this is the thread <br />https://community.hortonworks.com/questions/144255/controller-service-is-disabled.html?childToView=144274<br />Anonymoushttps://www.blogger.com/profile/13899516942667641595noreply@blogger.comtag:blogger.com,1999:blog-42869383794183737.post-72492995232292019282017-10-26T06:59:29.524-07:002017-10-26T06:59:29.524-07:00This comment has been removed by the author.Anonymoushttps://www.blogger.com/profile/13899516942667641595noreply@blogger.com