Friday, April 8, 2016

SQL in NiFi with ExecuteScript

There is a good amount of support for interacting with Relational Database Management systems (RDBMS) in Apache NiFi:

  • Database Connection Pool controller service: A shared resource for processors to get connections to an RDBMS
  • ExecuteSQL: A processor to execute SELECT queries against an RDBMS
  • PutSQL: A processor to execute statements (INSERT, UPDATE, e.g.) against an RDBMS
  • QueryDatabaseTable: A processor to perform incremental fetching from an RDBS table

I will have a blog soon describing the configuration and use of the QueryDatabaseTable processor, which was added in Apache NiFi 0.6.0.

To set up a Database Connection Pool controller service, refer to this User Guide section. I have configured mine for a local PostgreSQL instance:


NOTE: I named it 'PostgresConnectionPool', that will enter into the script config later.

Back to the title of this post :)  The scripting processors don't know about (or have a dependency on) the Database Connection Pool controller service instances or even the API (DBCPService interface, e.g.). This would often preclude our code from accessing the service to get a database connection.

However, DBCPService has a single method getConnection(), which returns a java.sql.Connection. This class is part of Java proper, and is all we really need from the service; we can talk JDBC from there.  One of the great things about Groovy is dynamic method invocation, meaning I can call a method on an object if I know the method is there, even if I don't know what Class is the type of the object.  We'll get to that shortly.

To work with a Controller Service from ExecuteScript, we need to get a reference to a ControllerServiceLookup from the process context. In Groovy, this looks like:
def lookup = context.controllerServiceLookup
Now that we have a lookup, we use it to locate the service we want. If we know the controller service we want (and it won't change, get deleted/recreated, etc.), we can get the identifier (Id below):


 However, for this example I wanted the user to be able to specify the name of the controller service (in this case PostgresConnectionPool), not the identifier. For that we need to get all controller service identifiers, then find the one whose name equals PostgresConnectionPool.

I used a dynamic property in the ExecuteScript config dialog to let the user set the name of the desired Database Connection Pool controller service:


ExecuteScript will create a variable for each dynamic property, and bind a PropertyValue object to it. This is so you can access the value of the property as the correct type (String, integer, etc.). In our case it's a String so we can use the PropertyValue.getValue() method. In Groovy it's as simple as:
def dbServiceName = databaseConnectionPoolName.value
Now that we have a ControllerServiceLookup and the name of the service we want to find, we can use the APIs to iterate over the services until we find one whose name is the one we're looking for:
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
Now we have the identifier of the service we want, and can use lookup.getControllerService(dbcpServiceId) to get a reference to the ControllerService itself.  Note we haven't referred to this service as a DBCPService, because the script (and the processor) do not have access to that class. However as I said before, we know the method (getConnection) we want to call and we have access to the return type of that method. So in Groovy you can just invoke it:
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
I used the safe-reference operator (?.) but to go further you will want to ensure conn is not null, and report an error if it is.

Now that we have a Connection object, we could use the JDBC API (java.sql.*) to issue queries, go through the ResultSet's rows, then get each column's name and value, etc. etc. However Groovy has an excellent object called groovy.sql.Sql that does all this with Groovy idioms.  For example, to issue a query 'select * from users' and iterate over the rows (with their row number), you have:
def sql = new Sql(conn)
sql.rows('select * from users').eachWithIndex { row, idx ->
    // Do stuff for each row here
}
In my case, I want to find the column names the first time, and output them as CSV headers. Then for all rows I want to comma-separate the values and output:
if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
out.write((row.values().join(',') + "\n").getBytes())
All that's left to do is to set the filename attribute to the one specified by the filename variable (see the Configure Processor dialog above), and transfer the new flow file.  The entire script looks like this:

import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
flowFile = session.create()
flowFile = session.write(flowFile, {out -> 
    def sql = new Sql(conn)
    sql.rows('select * from users').eachWithIndex { row, idx ->
        if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
        out.write((row.values().join(',') + "\n").getBytes())
    }
  } as OutputStreamCallback)
  flowFile = session.putAttribute(flowFile, 'filename', filename.value)
  session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Scripting error', e)
    session.transfer(flowFile, REL_FAILURE)
}
conn?.close()

This script probably needs a little work before you'd want to use it, to check whether the Controller Service was found, to quote any row value that has a comma in it, etc. But I tried to keep it brief to illustrate the concepts, which are the fluent NiFI API and the cool idioms of Groovy :) For my example table, the script produces a valid CSV file:


This processor is available as a template on Gist (here), as always I welcome all questions, comments, and suggestions.

Cheers!

42 comments:


  1. it’s ok to show some appreciation and say ‘great post’
    .NET developer

    ReplyDelete
  2. Hello,
    i have a question. I have a requirement to run a dynamic query in Hive. Can i achive that using executesql processor? i cannot use a SelectHiveQL processor because my query is dynamic. I tried it using executesql by creating a DBCP with Hive connection details but it is not working. Please help me.

    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete
    2. What do you mean by a dynamic query? Could you get the information into the NiFi flow and then use ReplaceText with NiFi Expression Language to specify the query?

      Delete
  3. Hey Matt, I am actually trying to transfer .WAF file from one DB to another DB, do you have any idea using what processor in NiFi is this possible?

    ReplyDelete
    Replies
    1. I'm not familiar with WAF files, can you explain more about them and which databases you are trying to transfer from/to?

      Delete
    2. This comment has been removed by the author.

      Delete
  4. Hi Matt:

    I am new to NiFi. Need to make a connection to PostgreSQL and load files into PostgreSQL using NiFi.

    Any help or examples greatly appreciated.

    Thanks.

    ReplyDelete
  5. Hey great post I am trying to do the same type of thing with HBase but get
    Caused by: groovy.lang.MissingMethodException: No signature of method: com.sun.proxy.$Proxy83.getConnection() is applicable for argument types: () values: []
    on this code
    import org.apache.nifi.controller.ControllerService
    flowFile = session.get()

    if(!flowFile) return

    def lookup = context.controllerServiceLookup
    def HbaseServiceName = HBaseConnectionName.value

    def HBaseServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == HbaseServiceName
    }
    def conn = lookup.getControllerService(HBaseServiceId)?.getConnection()

    any suggestions? I am trying to implement increment method reusing HBase service.

    ReplyDelete
  6. Is this supportiv in NiFI 1.1.1 version, when tried it throws NPE ...

    ReplyDelete
    Replies
    1. It should be, do you know where the NPE is occurring?

      Delete
  7. can you help me how to create Database Connection Pool controller service

    ReplyDelete
    Replies
    1. Which part are you having trouble with? Have you seen the documentation for the controller service? https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-dbcp-service-nar/1.2.0/org.apache.nifi.dbcp.DBCPConnectionPool/index.html

      Also, you may find more help from the Apache NiFi users list (just send a message to users-subscribe@nifi.apache.org

      Delete
    2. I follow all steps that are mentioned in link but now, I got issue "unable to execute SQL select * from tablename due to org.apache.nifi.processor.exception.ProcessExecption: org.postgresql.util.PSQLException: ERROR : relation "hcp" does not exist Position: 15; routing to failure"

      Delete
  8. try repeat AS-IS:

    ExecuteScript[id=015e1000-0b43-18ae-7b55-c488d71978a6] Scripting error: groovy.lang.GroovyRuntimeException: Ambiguous method overloading for method groovy.sql.Sql#.
    Cannot resolve which method to invoke for [null] due to overlapping prototypes between:
    [interface javax.sql.DataSource]
    [interface java.sql.Connection]

    ReplyDelete
    Replies
    1. Your "conn" variable is getting a null value assigned from the following code:

      def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()

      I'm guessing the dbcpServiceId is not set or it is incorrect. You can remove the question mark from the above line to see if you get a NullPointerException when calling getConnection(). If so, then you did not find the Controller Service you were looking for

      Delete
  9. This comment has been removed by the author.

    ReplyDelete
  10. Hi Matt,
    Thanks for your post, I followed your instructions exactly but for some reason, flow works for few hours then I get connection problem.. it can't find the connectionPool

    can you help...

    this is the thread
    https://community.hortonworks.com/questions/144255/controller-service-is-disabled.html?childToView=144274

    ReplyDelete
  11. Matt, can the same thing be done with Python/Jython?

    ReplyDelete
  12. tried to convert this to Python but cannot figure out how to do this piece lookup.getControllerServiceIdentifiers(ControllerService). ControllerService is an interface and I am not sure how to implement it in Python.

    ReplyDelete
    Replies
    1. You should be able to use it exactly that like that (using "ControllerService" as the class/interface name). Check the Accessing Controller Services section of part 3 of my ExecuteScript Cookbook on HCC: https://community.hortonworks.com/content/kbentry/77739/executescript-cookbook-part-3.html

      Delete
    2. thanks for reply, Matt. I tried everything, read all your posts 10 times :) and then I was about to give up, I tried again and it worked!!

      I was saving my script to a file not the body of the processor, so maybe there was some lag between saves.

      from org.apache.nifi.controller import ControllerService

      lookup = context.getControllerServiceLookup()
      dbServiceName = databaseConnectionPoolName.getValue()

      for cs in lookup.getControllerServiceIdentifiers(ControllerService):
      if lookup.getControllerServiceName(cs) == dbServiceName:
      dbcpServiceId = cs

      log.info(dbcpServiceId)

      Delete
    3. now having converted your code to Python, I see how nice and neat your Groovy script is. groovy.sql.Sql is hiding lots of boilerplate code of java.sql. I saw in your other blogs that performance of Groovy scripts would be better so probably will learn some Groovy :)

      Delete
    4. If you like the Groovy Way, then in NiFi 1.5.0 you can use ExecuteGroovyScript rather than ExecuteScript, the former gives you a lot more options using the Groovy idiom since it doesn't have to adhere to the JSR-223 spec.

      Delete
    5. thanks Matt. Ideally, I would love to see Nifi with full support of Python native and compiled packages but since it is not possible currently, looks like Groovy is the way to go because it supports Java jars and has better performance. I do love the language, reminds me Python a lot ;)

      Delete
    6. The barrier to adding native Python is that NiFi runs on the JVM, so we'd need a package that comes with the interpreter and allows you to pip install modules there, then we'd still end up shelling out to the OS to call the interpreter anyway. So there is ExecuteStreamCommand and ExecuteProcess that can do that for your existing Python interpreter, you just need to write scripts that work with flow file contents as STDIN and your output becomes the output flow file content. It doesn't have the same NiFi API integration to work with attributes, flowfiles, etc.

      Delete
    7. got it, pain in the neck..I think pyspark allows using compiled python packages but I understand it is a very different architecture. Well, it is a good excuse to pick up a new language

      Delete
    8. Matt, thanks again for your help and posts. I had a lot of fun using NiFi and Groovy. I mentioned you and this post on my blog. Hope it is okay. I think we both live in Orlando, small world ;)

      http://boristyukin.com/how-to-connect-apache-nifi-to-apache-impala/

      Delete
    9. Awesome, maybe I'll see you around! Have you heard of the Orlando Developers group? We have Meetups (https://www.meetup.com/OrlandoDevs/) and a Slack team and such, feel free to email me (my address is in any reply of mine on the NiFi mailing lists) and I can invite you to the Slack team if you like :)

      Delete
    10. Can you share your python script? I wont to execute storage procedure on MSSQL using python, but don't now how to connect to DB using jython-script(usually i use pyodbc).

      Delete
  13. Hi @Matt Burgess, I am trying as you mention in groovy code but getting error .

    import org.apache.nifi.controller.ControllerService
    import groovy.sql.OutParameter
    import groovy.sql.Sql
    import oracle.jdbc.OracleTypes

    import java.sql.ResultSet


    def lookup = context.controllerServiceLookup
    def dbServiceName = ConncationPool.value
    def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
    }
    def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
    sql = Sql.newInstance(conn);
    OutParameter CURSOR_PARAMETER = new OutParameter() {
    public int getType() {
    return OracleTypes.CURSOR;
    }
    };
    def data = []
    String sqlString ="""{call sp_hp_course_status_chk(?, ?, ?, ?, ?, ?)}""";
    ////Get the session values from Nifi flow
    flowFile = session.get()
    if(!flowFile) return
    in_track_id = flowFile.getAttribute('body_track_id');
    in_username = flowFile.getAttribute('body_username');
    in_course_no = flowFile.getAttribute('body_course_number');



    def parametersList = [in_track_id, in_username, in_course_no, CURSOR_PARAMETER, Sql.NUMERIC ,Sql.VARCHAR];
    // rs contains the result set of cursor my_cur
    sql.call(sqlString, parametersList) {out_details, out_status_code,out_status_desc ->

    out_details.eachRow {
    data << it.toRowResult()
    //print (data)
    flowFile = session.putAttribute(flowFile, 'out_details', data);
    }
    //print (out_status_code)
    //print (out_status_desc)
    flowFile = session.putAttribute(flowFile, 'out_status_code', out_status_code);
    flowFile = session.putAttribute(flowFile, 'out_status_desc', out_status_desc);

    };


    In Property i set up ConncationPool as DBCPConnectionPool-Saba
    error is
    Caused by: javax.script.ScriptException: org.apache.nifi.processor.exception.ProcessException: org.apache.commons.dbcp.SQLNestedException: Cannot get a connection, pool error Timeout waiting for idle object

    ReplyDelete
  14. This comment has been removed by a blog administrator.

    ReplyDelete
  15. Hi I want to pass value to dbcp connection pool dynamically is it possible

    ReplyDelete
  16. Hi Matt,

    I need to execute the Stored Procedure Like Exec SP_name on sql db in sql server. Can you let me know how can i do that?

    ReplyDelete
  17. Thanks for sharing this post. Your post is really very helpful its students. python online course

    ReplyDelete
  18. iam new to nifi... i have to do this task below:
    Create 3 execute script processor (use Closure, Groovy, Java Script, LUA, Python or Ruby) for apache nifi (https://nifi.apache.org/)
    These processors should use a standard controller service for mysql database connection
    You have to create 2 Tables with identical column’s (minimum 4 and one has to be a date (YYMMDD or YYYY – MM – DD  however you like)
    You have to fill Table 1 with content ( min 40 rows)
    Processor Number 1 has to read information from a table 1 ( minimum 4 Column’s  ) from database and create a flow file objects per row (min 40) with data as content
    Use the merge Processor to merge the content of minimum 40 flow files (which contain rows of Table 1 ) to one flow file
    Processor Number 2 has to read the merged flow file and has to sort the content based on the date and have to create a new sorted flow file
    Processor Number 3 has to read the content of the sorted flow file and has to write the content by using the same controller service back into table 2

    Can you suggest matt

    ReplyDelete
  19. Matt.. iam getting this exception.... i configured database connection pool processor..Caused by: javax.script.ScriptException: javax.script.ScriptException: groovy.lang.MissingPropertyException: No such property: databaseConnectionPoolName for class

    ReplyDelete
  20. Hi I am getting deadlock error from sql server. I am processing large file by dividing it into batches and than into individual rows as a flow file. I insert original data into a table called Record table and than later on update it in another groovy script . Both the scripts codes are here. Can u suggest the correct way to fix it?


    import org.apache.nifi.controller.ControllerService
    import org.apache.commons.io.IOUtils
    import java.nio.charset.StandardCharsets
    import groovy.json.JsonSlurper
    import groovy.sql.Sql
    import java.text.SimpleDateFormat

    def flowFile = session.get()

    if(!flowFile) return

    def conn

    try {
    def lookup = context.controllerServiceLookup
    def dbServiceName = databaseConnectionPoolName.value
    def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
    }
    conn = lookup.getControllerService(dbcpServiceId)?.getConnection()

    def jsonSlurper = new JsonSlurper()

    def batchId = flowFile.getAttribute('batchId')
    def recordCreateddateTime = new Date()
    def rowNumber = flowFile.getAttribute('fragment.index')
    if(batchId){

    def statusId = 1
    def processMessage = ''

    def sql = new Sql(conn)

    sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row ->
    if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){
    statusId= row.RecordStatusId
    statusId= statusId.toInteger()
    processMessage = row.StatusDescription
    }
    }
    def generatedKeys = ''
    def row = session.read(flowFile).getText("UTF-8")
    generatedKeys = sql.executeInsert("INSERT INTO DataRecord(BatchId, RowNumber,ProcessMessage,RecordStatusId,CreatedBy,CreatedDate,OriginalData) values (${batchId}, ${rowNumber}, ${processMessage},${statusId},'NIFI',${recordCreateddateTime},${row})")
    flowFile = session.putAttribute(flowFile, 'recordId', generatedKeys[0][0].toString())
    session.transfer(flowFile,REL_SUCCESS)
    }
    conn?.close()
    }
    catch(e) {
    conn?.close()
    log.error('Scripting error', e)
    flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())
    flowFile = session.putAttribute(flowFile, 'execution.status', 'error')
    flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())
    session.transfer(flowFile, REL_FAILURE)
    }

    ReplyDelete
  21. import org.apache.nifi.controller.ControllerService
    import org.apache.commons.io.IOUtils
    import java.nio.charset.StandardCharsets
    import groovy.json.JsonSlurper
    import groovy.sql.Sql
    import java.text.SimpleDateFormat

    def flowFile = session.get()

    if(!flowFile) return

    def conn

    try {
    def lookup = context.controllerServiceLookup
    def dbServiceName = databaseConnectionPoolName.value

    def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
    }
    conn = lookup.getControllerService(dbcpServiceId)?.getConnection()

    def jsonSlurper = new JsonSlurper()

    def recordId = flowFile.getAttribute('recordId')
    def errorStatus = flowFile.getAttribute('execution.status')
    def errorMessage = flowFile.getAttribute('execution.message')
    def errors = flowFile.getAttribute('errors')
    def recordCreateddateTime = new Date()
    if(recordId){

    def statusId = 1
    def processMessage = ''

    def sql = new Sql(conn)

    sql.rows('select RecordStatusId,StatusDescription from RecordStatus').each{ row ->
    if((errorStatus && errorStatus == 'cannotParseJson' )|| errors){
    if(row.StatusDescription.toUpperCase() == 'Errored'.toUpperCase()){
    statusId= row.RecordStatusId
    statusId= statusId.toInteger()
    processMessage = errorMessage
    }
    }else if(row.StatusDescription.toUpperCase() == 'Created'.toUpperCase()){
    statusId= row.RecordStatusId
    statusId= statusId.toInteger()
    processMessage = row.StatusDescription
    }
    }

    def row = session.read(flowFile).getText("UTF-8")
    sql.executeInsert("UPDATE DataRecord SET JsonData = ${row}, UpdatedBy = 'NIFI', UpdatedDate = ${recordCreateddateTime} , ProcessMessage=${processMessage}, RecordStatusId = ${statusId} WHERE DataRecordId = ${recordId} ")
    flowFile = session.putAttribute(flowFile, 'success', 'updatedSuccessfully')
    session.transfer(flowFile,REL_SUCCESS)
    }
    conn?.close()
    }
    catch(e) {
    conn?.close()
    log.error('Scripting error', e)
    flowFile = session.putAttribute(flowFile, 'execution.message', e.getMessage())
    flowFile = session.putAttribute(flowFile, 'execution.status', 'error')
    flowFile = session.putAttribute(flowFile, 'execution.error', e.getStackTrace().toString())
    session.transfer(flowFile, REL_FAILURE)
    }

    ReplyDelete
  22. I am new to Nifi, but I am experienced in Python. Is it possible to access DBCPConnectionPool through Python script?

    ReplyDelete
  23. Thank you Matt! Excellent article! This will eliminate lot of processing and resources to pull the data from database directly into csv. Appreciate all your help through nice articles.

    ReplyDelete