Wednesday, February 24, 2016

Writing Reusable Scripted Processors in NiFi

This blog has quite a few posts about the various things you can do with the new (as of NiFi 0.5.0) scripting processors. Most are about ExecuteScript and how to use it to do per-flowfile things like replace content, use external modules to add functionality, etc.  However most are specific for the task at hand, and aren't really general-use scripts.

We could use dynamic properties (explained in the Developer's Guide and in an earlier post), as they are passed into ExecuteScript as variables. However the user of the processor would have to know which properties to add and fill in, and there's no good way to get that information to the user (at least with ExecuteScript).

However, InvokeScriptedProcessor lets you provide a scripted implementation of a full Processor instance. This means you can define your own properties and relationships, along with documentation and validation of them.  Your script could provide capabilities that depend on the way the user of the processor configures the processor, without having to interact with the script at all!

I'll illustrate this below, but I think the coolest point is: A template with a single InvokeScriptedProcessor (that contains a working script) can be dragged onto the canvas and basically acts like dragging your custom processor onto the canvas! When the user opens the dialog, they will see the properties/relationships you added, and they will be validated just like the normal ones (script language, body, etc.) that come with the processor.

The scripted processor needs only implement the Processor interface, which in turn extends AbstractConfigurableComponent. A basic Groovy skeleton with a class including a set of overridden interface methods looks like this:

class MyProcessor implements Processor {

    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
      // do stuff
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        return null
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override
    List<PropertyDescriptor> getPropertyDescriptors() { return [] as List }

    @Override
    String getIdentifier() { return 'MyProcessor-InvokeScriptedProcessor' }
}

processor = new MyProcessor()

Note that the class must implement Processor and declare a variable named "processor" that contains an instance of the class. This is the convention required by the InvokeScriptedProcessor.

IMPORTANT: Although you may find in NiFi code that many processors extend either AbstractProcessor or AbstractSessionFactoryProcessor, your script will most likely NOT work if it extends one of these classes. This is due to the validate() method of these classes being declared final, and the basic implementation will expect the set of Supported Property Descriptors to include the ones that come with the InvokeScriptedProcessor (like Script File), but will only use the list that your scripted processor provides. There might be a hack to get around this but even if possible, it's not likely worth it.

Moving on, let's say we want to create a reusable scripted processor that works like GenerateFlowFile but allows the user to provide the content of the flow file as well as the value of its "filename" attribute.  Moreover, maybe the content could include NiFi Expression Language (EL) constructs like ${hostname()}. Since the content may have something like EL statements but the user might not want them evaluated as such, we should let the user decide whether to evaluate the content for EL statements before writing to the flow file.  Lastly, this is a "generate" processor so we only need a "success" relationship; "failure" doesn't really make sense here. Having said that, it will be important to catch all Exceptions that your code can throw; wrap each in a ProcessException and re-throw, so the framework can handle it correctly.

So the list of things to do:

  1. Add a "success" relationship and return it in (in a Set) from getRelationships()
  2. Add a "File Content" property to contain the intended content of the flow file (may include EL)
  3. Add a "Evaluate Expressions in Content" property for the user to indicate whether to evaluate the content for EL
  4. Add an optionally-set "Filename" property to override the default "filename" attribute.
  5. When the processor is triggered, create a flow file, write the content (after possibly evaluating EL), and possibly set the filename attribute

Here is some example Groovy code to do just that:
class GenerateFlowFileWithContent implements Processor {

    def REL_SUCCESS = new Relationship.Builder()
            .name('success')
            .description('The flow file with the specified content and/or filename was successfully transferred')
            .build();

    def CONTENT = new PropertyDescriptor.Builder()
            .name('File Content').description('The content for the generated flow file')
            .required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build()
    
    def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
            .name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
            .required(true).allowableValues('true','false').defaultValue('false').build()
            
    def FILENAME = new PropertyDescriptor.Builder()
            .name('Filename').description('The name of the flow file to be stored in the filename attribute')
            .required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
    
    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
      try {
        def session = sessionFactory.createSession()
        def flowFile = session.create()
        
        def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
        def contentProp = context.getProperty(CONTENT)
        def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
        def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
        
        flowFile = session.write(flowFile, { outStream ->
                outStream.write(content.getBytes("UTF-8"))
            } as OutputStreamCallback)
        
        if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
        // transfer
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
      } catch(e) {
          throw new ProcessException(e)
      }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        switch(name) {
            case 'File Content': return CONTENT
            case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
            case 'Filename': return FILENAME
            default: return null
        }
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override
    List<PropertyDescriptor>> getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }

    @Override
    String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
    
}

processor = new GenerateFlowFileWithContent()

When this is entered into the Script Body of an InvokeScriptedProcessor, with the language set to Groovy and then applied (by clicking Apply on the dialog), then when the dialog is reopened you should see the relationships set to only "success" and the properties added to the config dialog:


At this point you can save the single processor as a template, calling it perhaps GenerateFlowFileWithContent or something.  Now it is a template that is basically reusable as a processor.  Try dragging it onto the canvas and entering some values, then wiring it to some other processor like PutFile (to see if it works):


Once the success relationship has been satisfied, that instance should be good to go:


Hopefully this has illustrated the power and flexibility of InvokeScriptedProcessor, and how it can be used to create reusable processor templates with custom logic, without having to construct and deploy a NAR. The example template is available as a Gist (here); as always I welcome all comments, questions, and suggestions.

Cheers!

19 comments:

  1. Thank you for the helpful tip.
    But are there some way to dynamically describe the new processor in order to use the
    "Add Processor" facilities (tag cloud, search engine) instead of using the template select ?
    Thanks again.

    ReplyDelete
    Replies
    1. This is not currently possible, as NiFi uses the ServiceLoader interface to load the individual processors (such as InvokeScriptedProcessor). This may become possible with the work being done to support an Extension Registry (https://cwiki.apache.org/confluence/display/NIFI/Extension+Registry)

      Delete
  2. This comment has been removed by the author.

    ReplyDelete
  3. Matt, thanks for the useful example and all you work documenting Nifi! It has huge potential for my company's goals. I've been desperately trying to build some processors I need to no avail, but this at least gets me closer. I've started with a simple test script in Groovy that puts SQL results directly to JSON (avoiding the nasty issues with type casting MySQL numerics to Avro just to go to JSON anyway). I tried to match your example as close as possible with my own, but I keep getting the following error for each of my properties:

    '(property name)' validated against '(property value)' is invalid because '(property name)' is not a supported property

    Probably just something dumb. If anyone gets a chance to check it out, I would be supremely grateful. The script code is here:
    https://wiki.homecarepulse.com/tiki-download_file.php?fileId=52

    Thanks in advance!

    ReplyDelete
    Replies
    1. What version of NiFi are you using? I put it into NiFi 1.0.0 and uncommented the def ProcessorLog log, changed ProcessorLog to ComponentLog, and it loads fine with the properties in the config dialog.

      Also if you'd like to ask questions to a larger audience you can subscribe to the NiFi developer mailing list by sending a message to dev-subscribe at apache dot org, there are quite a few folks familiar with these scripting processors and are always helpful!

      Delete
  4. That was one fast response! Anyway, I am indeed using Nifi 1.0. I should have explained that the fields show up, but when you try to put actual values into them, then the little error icon shows on the processor (preventing it from being started), and I see that error message when I hover over it.

    Also, does this mean the when I upgrade to 1.1 I will need to change the ComponentLog back to ProcessorLog? If so, I'll add a comment in the code for myself so I remember. I can try the mailing list, too, if that helps.

    ReplyDelete
  5. No, ProcessorLog was permanently changed to ComponentLog, can't remember when (0.7.0 or 1.0.0). I put actual values into that processor's properties (not all but some) and it was still marked valid. Do you get that error for every single one? Your getSupportedPropertyDescriptors looks good, and each property looks good (although you may want to check out StandardValidators once this issue is cleared up).

    ReplyDelete
  6. Aha! I deleted the InvokeScriptedProcessor instance I was working on, recreated a new one and pasted the script back in, and now it works. Apparently modifying the failing instance wasn't clearing it up, despite the code being corrected. You're the man! I send you a proverbial toast.

    ReplyDelete
  7. how to implement SCD (Slowly Changing Dimension) data through NIFI

    ReplyDelete
    Replies
    1. Can you describe your use case a bit more? What are you trying to do with an SCD, and what would you like to use NiFi for? Also you may find more help from the users or dev mailing lists (https://nifi.apache.org/mailing_lists.html)

      Delete
  8. Matt, this is awesome and I really like your example as you've covered lots of pieces. I want to create a required property for user to pick database controller pool from a list of controllers, available for a process group. Much like you pick one with ExecuteSQL processor for example. Is this possible?

    ReplyDelete
    Replies
    1. Yes but you have to use InvokeScriptedProcessor (ISP) for that, you'd provide such a property (borrowing the code from ExecuteSQL) in your script's implementation of getSupportedPropertyDescriptors() and when you hit Apply on the processor and reopen the dialog, you should see your property (with the dropdown) added. You can use a lot of code from ExecuteSQL, such as getting the DBCPService from the property, and using it to getConnection() for example.

      If you need a template for ISP, check my other blog post http://funnifi.blogspot.in/2017/06/invokescriptedprocessor-template-faster.html

      Delete
    2. great, nice tips! Did you come up with a good way to test these custom processors? i.e. developing initial code in Groovy IDE like IntelliJ?

      Delete
    3. I have a test utility for ExecuteScript: https://funnifi.blogspot.in/2017/10/release-120-of-nifi-script-tester.html
      I'm hoping to upgrade it to support InvokeScriptedProcessor and the other scripted components (ScriptedReportingTask, e.g.) at some point.

      Delete
  9. Matt, I want to create a processor, that transforms incoming flowFiles and route to failure those, that could not be transformed. I struggle to handle catch exception properly in that case. should I route session on exception to failure relation first and then do a rollback or commit? and then still throw new ProcessException(e)?

    ReplyDelete
    Replies
    1. ok I think I finally figured it after hours of frustration. In a catch block, I routed flowfile to failure and did session.commit(). I did not do "throw new ProcessException(e)" as it would throw additional bulletin error along with my log.error message which was confusing.

      catch (e) {
      log.error('Error ....', e)
      session.transfer(flowFile, REL_FAILURE)
      session.commit()
      // throw new ProcessException(e)
      }

      Delete
  10. Matt, is it possible to do something similar to @onScheduled annotation with normal nifi processors? A place to do some heavy one time operations when a processor is started from NiFi canvas..One example would be to obtain database connection or something like that that won't change until processor is stopped

    ReplyDelete
  11. Hi can we use pyspark instead groovy ? Which is the best one to use for huge data

    ReplyDelete