There is a good amount of support for interacting with Relational Database Management systems (RDBMS) in Apache NiFi:
- Database Connection Pool controller service: A shared resource for processors to get connections to an RDBMS
- ExecuteSQL: A processor to execute SELECT queries against an RDBMS
- PutSQL: A processor to execute statements (INSERT, UPDATE, e.g.) against an RDBMS
- QueryDatabaseTable: A processor to perform incremental fetching from an RDBS table
I will have a blog soon describing the configuration and use of the QueryDatabaseTable processor, which was added in Apache NiFi 0.6.0.
To set up a Database Connection Pool controller service, refer to
this User Guide section. I have configured mine for a local PostgreSQL instance:
NOTE: I named it 'PostgresConnectionPool', that will enter into the script config later.
Back to the title of this post :) The scripting processors don't know about (or have a dependency on) the Database Connection Pool controller service instances or even the API (DBCPService interface, e.g.). This would often preclude our code from accessing the service to get a database connection.
However, DBCPService has a single method getConnection(), which returns a java.sql.Connection. This class is part of Java proper, and is all we really need from the service; we can talk JDBC from there. One of the great things about Groovy is dynamic method invocation, meaning I can call a method on an object if I know the method is there, even if I don't know what Class is the type of the object. We'll get to that shortly.
To work with a Controller Service from ExecuteScript, we need to get a reference to a ControllerServiceLookup from the process context. In Groovy, this looks like:
def lookup = context.controllerServiceLookup
Now that we have a lookup, we use it to locate the service we want. If we know the controller service we want (and it won't change, get deleted/recreated, etc.), we can get the identifier (Id below):
However, for this example I wanted the user to be able to specify the name of the controller service (in this case PostgresConnectionPool), not the identifier. For that we need to get all controller service identifiers, then find the one whose name equals PostgresConnectionPool.
I used a dynamic property in the ExecuteScript config dialog to let the user set the name of the desired Database Connection Pool controller service:
ExecuteScript will create a variable for each dynamic property, and bind a PropertyValue object to it. This is so you can access the value of the property as the correct type (String, integer, etc.). In our case it's a String so we can use the PropertyValue.getValue() method. In Groovy it's as simple as:
def dbServiceName = databaseConnectionPoolName.value
Now that we have a ControllerServiceLookup and the name of the service we want to find, we can use the APIs to iterate over the services until we find one whose name is the one we're looking for:
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
Now we have the identifier of the service we want, and can use lookup.getControllerService(dbcpServiceId) to get a reference to the ControllerService itself. Note we haven't referred to this service as a DBCPService, because the script (and the processor) do not have access to that class. However as I said before, we know the method (getConnection) we want to call and we have access to the return type of that method. So in Groovy you can just invoke it:
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
I used the safe-reference operator (?.) but to go further you will want to ensure conn is not null, and report an error if it is.
Now that we have a Connection object, we could use the JDBC API (java.sql.*) to issue queries, go through the ResultSet's rows, then get each column's name and value, etc. etc. However Groovy has an excellent object called
groovy.sql.Sql that does all this with Groovy idioms. For example, to issue a query 'select * from users' and iterate over the rows (with their row number), you have:
def sql = new Sql(conn)
sql.rows('select * from users').eachWithIndex { row, idx ->
// Do stuff for each row here
}
In my case, I want to find the column names the first time, and output them as CSV headers. Then for all rows I want to comma-separate the values and output:
if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
out.write((row.values().join(',') + "\n").getBytes())
All that's left to do is to set the filename attribute to the one specified by the filename variable (see the Configure Processor dialog above), and transfer the new flow file. The entire script looks like this:
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
flowFile = session.create()
flowFile = session.write(flowFile, {out ->
def sql = new Sql(conn)
sql.rows('select * from users').eachWithIndex { row, idx ->
if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
out.write((row.values().join(',') + "\n").getBytes())
}
} as OutputStreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', filename.value)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error', e)
session.transfer(flowFile, REL_FAILURE)
}
conn?.close()
This script probably needs a little work before you'd want to use it, to check whether the Controller Service was found, to quote any row value that has a comma in it, etc. But I tried to keep it brief to illustrate the concepts, which are the fluent NiFI API and the cool idioms of Groovy :) For my example table, the script produces a valid CSV file:
This processor is available as a template on Gist (
here), as always I welcome all questions, comments, and suggestions.
Cheers!