While addressing this question, I realized it would make a good example of how to use ScriptedLookupService to provide a sequence number from a database to a field in a record. This can be useful when using PutDatabaseRecord downstream, when you want some field to contain a sequence number from a database.
In my example I'm using a CSV file with a few dummy values in it, adding an "id" field containing a sequence number, and writing out the records as JSON objects:
The LookupRecord configuration is as follows, note that the processor requires the value of any user-defined property to be a RecordPath that evaluates to a non-null value. I chose "/Age" as a dummy RecordPath here, since I know the record contains a non-null Age field:
The Groovy script for ScriptedLookupService is as follows, I am connecting to a PostgreSQL database so I generated the corresponding SQL using nextval(), you can change this to the SQL appropriate for your database type:
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.dbcp.DBCPService
import java.sql.*
class SequenceLookupService implements LookupService<String> {
final String ID = UUID.randomUUID().toString()
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
.required(true)
.identifiesControllerService(DBCPService)
.build()
Connection conn = null
Statement stmt = null
ComponentLog log = null
@Override
Optional<String> lookup(Map<String, String> coordinates) {
final String key = coordinates.keySet().first()
ResultSet rs = stmt?.executeQuery("select nextval('${key}')".toString())
return rs.next() ? Optional.ofNullable(rs.getLong(1)) : null
}
Set<String> getRequiredKeys() {
return java.util.Collections.emptySet();
}
@Override
Class<?> getValueType() {
return String
}
@Override
void initialize(ControllerServiceInitializationContext context) throws InitializationException {
}
@Override
Collection<ValidationResult> validate(ValidationContext context) {
null
}
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
name.equals(DBCP_SERVICE.name) ? DBCP_SERVICE : null
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
[DBCP_SERVICE] as List;
}
@Override
String getIdentifier() {
ID
}
def onEnabled(context) {
def db = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService)
conn = db.connection
stmt = conn.createStatement()
}
def onDisabled() {
conn?.close()
}
def setLogger(logger) {
log = logger
}
}
lookupService = new SequenceLookupService()
One caveat I should mention is that currently, any properties defined by the script do not get added to the ScriptedLookupService dialog until the service is enabled, I have written an improvement Jira (NIFI-5547) to fix that.
The HCC question also refers to the nextInt() feature of NiFi Expression Language, this is MUCH faster than retrieving a sequence from a database. So is the UpdateAttribute approach, to let NiFi handle the "sequence" rather than an external database. But if you do need to use an external database sequence, this script should allow you to do that.
Note in my lookup function I only grabbed the first entry in the map, this can easily be extended to do multiple lookups of multiple sequences, but note that user-defined properties are unique by name, so you can't fill in multiple fields from the same sequence.
The above template (with script) is available as a Gist here. As always, please let me know if you try this whether it works for you, and I welcome all comments, questions, and suggestions. Cheers!
Wednesday, August 22, 2018
Monday, August 13, 2018
One-and-done Flows (aka Job Processing)
We get a lot of questions on the Apache NiFi mailing list about being able to run a flow/processor just once, and indeed there is a related feature request Jira (NIFI-1407) that remains unimplemented. IMO this is for a few reasons, one being that "one-shot" processing doesn't really align with a flow-based architecture such as NiFi's, it is really more of a job processing feature.
Having said that, many users still desire/need that capability, and some workarounds have been presented, such as scheduling the source processor (often GenerateFlowFile if used as a trigger) for an exorbitant amount of time (weeks, e.g.) knowing that probably someone will come in and restart the processor before that. The other is to start the processor from the UI and immediately stop it, that keeps the original run going but prevents the processor from being scheduled further. The same can be done from the REST API, although for secure NiFi instances/clusters that can be a bit painful to set up.
In that vein I wanted to tackle one common use case in this area, the one where a flow file needs to be generated as a trigger, with or without possible content. I'll present two different Groovy scripts to be used in an ExecuteScript processor, one to use downstream from GenerateFlowFile, and one as the source processor itself.
For the first scenario, I'm picturing a GenerateFlowFile as the source processor, although technically it could be any source processor. The key is to schedule it to run at some huge interval, like 1 year or something. This is normally a decent workaround, but technically doesn't work if you leave your flow running for a year. However having this long an interval will prevent a ton of provenance events for the flow files headed into the ExecuteScript.
This Groovy script uses the processor's StateManager to keep track of a boolean variable "one_shot". If it is not yet set, the flow file is transferred to success and then the variable "one_shot" is set. From then on, the flow files will just be removed from the session. Note that this creates a DROP provenance event, which is why we don't want a lot of CREATE/DROP events for this part of the flow, which reinforces my earlier point about scheduling the source processor. The script is as follows:
import org.apache.nifi.components.state.*
flowFile = session.get()
if(!flowFile) return
StateManager sm = context.stateManager
def oldState = new HashMap<String,String>()
oldState.putAll(sm.getState(Scope.LOCAL).toMap())
if(!oldState.one_shot) {
// Haven't sent the flow file yet, do it now and update state
session.transfer(flowFile, REL_SUCCESS)
oldState.one_shot = 'true'
sm.setState(oldState, Scope.LOCAL)
} else {
// Remove flow file -- NOTE this causes upstream data loss, only use
// when the upstream flow files are meant as triggers. Also be sure the
// upstream schedule is large like 1 day, to avoid unnecessary provenance events.
session.remove(flowFile)
}
An alternative to dropping the flow files is to rollback the session, that can prevent data loss but eventually will lead to backpressure being applied at the source processor. If that makes more sense for your use case, then just replace the session.remove() with session.rollback().
The other scenario is when you don't need a full-fledged GenerateFlowFile processor (or any other processor), then ExecuteScript can be the source processor. The following script works similarly to the previous script, except it generates rather than receives flow files, and will accept an optional user-defined property called "content", which it will use for the outgoing FlowFile content. Again the run schedule should be sufficiently large, but in this case it won't matter as much since no provenance events will be created. However a thread is created and destroyed at each scheduled run, so might as well set the run schedule to be a large value. The script is as follows:
import org.apache.nifi.components.state.*
StateManager sm = context.stateManager
def oldState = new HashMap<String,String>()
oldState.putAll(sm.getState(Scope.LOCAL).toMap())
if(!oldState.one_shot) {
// Haven't sent the flow file yet, do it now and update state
def flowFile = session.create()
try {
String ffContent = context.getProperty('content')?.evaluateAttributeExpressions()?.value
if(ffContent) {
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(ffContent.bytes)
} as OutputStreamCallback)
}
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error("Couldn't generate FlowFile", e)
session.remove(flowFile)
}
oldState.one_shot = 'true'
sm.setState(oldState, Scope.LOCAL)
}
To reset these processors (in order to run another one-shot job/flow), you can stop them, right-click and choose View State then Clear (or send the corresponding command(s) via the REST API).
As always, please let me know how/if this works for you. Cheers!
Having said that, many users still desire/need that capability, and some workarounds have been presented, such as scheduling the source processor (often GenerateFlowFile if used as a trigger) for an exorbitant amount of time (weeks, e.g.) knowing that probably someone will come in and restart the processor before that. The other is to start the processor from the UI and immediately stop it, that keeps the original run going but prevents the processor from being scheduled further. The same can be done from the REST API, although for secure NiFi instances/clusters that can be a bit painful to set up.
In that vein I wanted to tackle one common use case in this area, the one where a flow file needs to be generated as a trigger, with or without possible content. I'll present two different Groovy scripts to be used in an ExecuteScript processor, one to use downstream from GenerateFlowFile, and one as the source processor itself.
For the first scenario, I'm picturing a GenerateFlowFile as the source processor, although technically it could be any source processor. The key is to schedule it to run at some huge interval, like 1 year or something. This is normally a decent workaround, but technically doesn't work if you leave your flow running for a year. However having this long an interval will prevent a ton of provenance events for the flow files headed into the ExecuteScript.
This Groovy script uses the processor's StateManager to keep track of a boolean variable "one_shot". If it is not yet set, the flow file is transferred to success and then the variable "one_shot" is set. From then on, the flow files will just be removed from the session. Note that this creates a DROP provenance event, which is why we don't want a lot of CREATE/DROP events for this part of the flow, which reinforces my earlier point about scheduling the source processor. The script is as follows:
import org.apache.nifi.components.state.*
flowFile = session.get()
if(!flowFile) return
StateManager sm = context.stateManager
def oldState = new HashMap<String,String>()
oldState.putAll(sm.getState(Scope.LOCAL).toMap())
if(!oldState.one_shot) {
// Haven't sent the flow file yet, do it now and update state
session.transfer(flowFile, REL_SUCCESS)
oldState.one_shot = 'true'
sm.setState(oldState, Scope.LOCAL)
} else {
// Remove flow file -- NOTE this causes upstream data loss, only use
// when the upstream flow files are meant as triggers. Also be sure the
// upstream schedule is large like 1 day, to avoid unnecessary provenance events.
session.remove(flowFile)
}
An alternative to dropping the flow files is to rollback the session, that can prevent data loss but eventually will lead to backpressure being applied at the source processor. If that makes more sense for your use case, then just replace the session.remove() with session.rollback().
The other scenario is when you don't need a full-fledged GenerateFlowFile processor (or any other processor), then ExecuteScript can be the source processor. The following script works similarly to the previous script, except it generates rather than receives flow files, and will accept an optional user-defined property called "content", which it will use for the outgoing FlowFile content. Again the run schedule should be sufficiently large, but in this case it won't matter as much since no provenance events will be created. However a thread is created and destroyed at each scheduled run, so might as well set the run schedule to be a large value. The script is as follows:
import org.apache.nifi.components.state.*
StateManager sm = context.stateManager
def oldState = new HashMap<String,String>()
oldState.putAll(sm.getState(Scope.LOCAL).toMap())
if(!oldState.one_shot) {
// Haven't sent the flow file yet, do it now and update state
def flowFile = session.create()
try {
String ffContent = context.getProperty('content')?.evaluateAttributeExpressions()?.value
if(ffContent) {
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(ffContent.bytes)
} as OutputStreamCallback)
}
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error("Couldn't generate FlowFile", e)
session.remove(flowFile)
}
oldState.one_shot = 'true'
sm.setState(oldState, Scope.LOCAL)
}
To reset these processors (in order to run another one-shot job/flow), you can stop them, right-click and choose View State then Clear (or send the corresponding command(s) via the REST API).
As always, please let me know how/if this works for you. Cheers!
Subscribe to:
Posts (Atom)