Monday, March 14, 2016

ExecuteScript - JSON-to-JSON Revisited (with Jython)

I've received some good comments about a couple of previous blog posts on using the ExecuteScript processor in NiFi (0.5.0+) to perform JSON-to-JSON transformations. One post used Groovy and the other used Javascript.

Since then I've received some requests for a Jython example for ExecuteScript, so I figured I'd do the same use case again (JSON to JSON) so folks can see the differences in the languages when performing the same operations :)

The approach has been covered in detail in the other posts, so I will talk a bit about the Jython-specific stuff and then get right to the code.

One major caveat here is that I don't know Python :)  I learned enough to get the script working, but please let me know how to better do any of this. I stopped touching the script once it worked, so it's very possible there are unnecessary imports, classes, etc.

The first thing to do is to bring in the Jython libraries you will need, as well as importing the Java and NiFi classes to be used:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

I didn't need to import java.lang.String as Jython does type coercion. I probably couldn't call getBytes() on that string unless Jython knew to coerce the object to a Java String, but that's ok because we can call bytearray("myString".encode('utf-8')) to achieve the same results.

The next task was to create a StreamCallback object for use in session.write(). I created a Jython class for this and overrode the interface method:
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
        # ...

After that, I read in and parsed the JSON text with IOUtils then json.loads(), then performed all the operations on the various parts of the object/dictionary. Finally I generated a new JSON string with json.dumps(), encoded to UTF-8, got the byte array, and wrote it to the output stream.

The resulting script is as follows:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "Range": 5,
          "Rating": obj['rating']['primary']['value'],
          "SecondaryRatings": {}
        }
    for key, value in obj['rating'].iteritems():
      if key != "primary":
        newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}
              
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

The template is available as a Gist (here). I welcome all suggestions on how to make this better, and please share any scripts you come up with!

Cheers!

36 comments:

  1. I modified your template to convert timestamps in JSON from a String to a Long:
    https://gist.github.com/ryanpersaud/734b68e3624d06433deaa114acc33865

    ReplyDelete
    Replies
    1. I am glad that I saw this post. It is informative blog for us and we need this type of blog thanks for share this blog, Keep posting such instructional blogs and I am looking forward for your future posts. Python Projects for Students Data analytics is the study of dissecting crude data so as to make decisions about that data. Data analytics advances and procedures are generally utilized in business ventures to empower associations to settle on progressively Python Training in Chennai educated business choices. In the present worldwide commercial center, it isn't sufficient to assemble data and do the math; you should realize how to apply that data to genuine situations such that will affect conduct. In the program you will initially gain proficiency with the specialized skills, including R and Python dialects most usually utilized in data analytics programming and usage; Python Training in Chennai at that point center around the commonsense application, in view of genuine business issues in a scope of industry segments, for example, wellbeing, promoting and account. Project Center in Chennai

      Delete
  2. When you make this call to the callback

    flowFile = session.write(flowFile,PyStreamCallback())

    can I treat inputStream as I would any file pointer - in other words, read in my records using csv.DictReader(inputStream, fieldnames) , for example? It would be helpful if I could do that rather than use a list iterator to iterate through the records.

    Thanks in advance for any comments.

    ReplyDelete
  3. Not sure, you might need FileUtil to wrap() the input stream: http://www.jython.org/javadoc/org/python/core/util/FileUtil.html

    ReplyDelete
  4. I'd like to append Attributes for a flowFile in my processing workflow to a disk file I use as a log. I need to create the output file on the first access attempt if it is not there, but if it exists I want to append to the contents that are already there.

    This script above is very helpful, and shows me how I can customize the Attributes of the flowFile out of this ExecuteScript processor. But the putFile processor that might follow only affords us the option to create a new output file each time. Is there an example that would demonstrate how to append the flowFile attribute information to a log file each time without losing the previous contents of the output file?

    Thanks again.

    ReplyDelete
    Replies
    1. This will add current date as a string:

      import json
      import java.io
      from datetime import datetime
      from org.apache.commons.io import IOUtils
      from java.nio.charset import StandardCharsets
      from org.apache.nifi.processor.io import StreamCallback

      class PyStreamCallback(StreamCallback):
      def __init__(self):
      pass

      def process(self, inputStream, outputStream):
      text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
      flowFileJson = json.loads(text)
      today_sting = datetime.today().strftime('%Y-%m-%d')
      flowFileJson['current_date'] = today_sting

      outputStream.write(bytearray(json.dumps(flowFileJson, indent=4).encode('utf-8')))

      flowFile = session.get()
      if (flowFile != None):
      flowFile = session.write(flowFile,PyStreamCallback())
      session.transfer(flowFile, REL_SUCCESS)


      If you want to add this string to every json object in json array replace
      flowFileJson['current_date'] = today_sting
      with
      for j in flowFileJson:
      j['current_date'] = today_sting

      Delete
  5. I am trying to do something similar here, but I want to encrypt parts of the JSON. I have created some code that does the encryption, but it requires me to import some python modules, mainly the Cipher module. As seen at this link(https://repl.it/EJU7/0). However, nifi cannot recognize the "from Crypto.Cipher import AES" call. Do you know any workarounds for this or any way to fix this?

    ReplyDelete
  6. I keep getting the error "No module named Crypto" even though I added the module path "/Library/Python/2.7/site-packages/Crypto/Cipher/AES.pyc" to the module directory section.

    ReplyDelete
  7. ExecuteScript uses Jython, not Python. This means that it can't import native or compiled Python modules, just pure Python modules. Check the Jython documentation on this, perhaps there is a pure Python library for crypto that you can use instead.

    ReplyDelete
  8. Hi,

    I am very new to NIFI.
    I am getting error like :
    "Failed to process session due to process exception, SyntaxError : no viable alternative character '' in script at line ::: column number 0."

    Could somebody guide me.

    Code :
    import json
    import java.io
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    class PyStreamCallback(StreamCallback):
    def __init__(self):
    pass
    def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    newObj = {
    "Range": 5,
    "SecondaryRatings": {}
    }
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
    flowFile = session.get()
    if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, REL_SUCCESS)


    Here I am just sending newObj on any input stream just for POC thing

    ReplyDelete

  9. 0
    down vote
    favorite
    I want to read data directly from Nifi OutPort to Spark streaming using PySpark (without using Kafka in between). I have seen few reference code in Scala, there using "SiteToSiteClient" method to setup connection with Nifi. But I am not able to get any equivalent Python module to develop similar code in Python.

    Can anyone please help me out?

    Thanks.

    ReplyDelete
  10. Excellent tutorial buddy. Directly I saw your blog and way of teaching was perfect, Waiting for your next tutorial.
    aws training in chennai | best aws training institute in chennai | aws certification exam centers in chennai

    ReplyDelete
  11. I like your blog, I read this blog please update more content on python, further check it once at python online training

    ReplyDelete
  12. Hi,

    I am very new to NIFI.

    I have some case what to know how can I do

    So I have jason file contain multiple config but I would like to general flow file from that jason and put all the config to attribute can I use this script or how can I do

    [
    {
    "Name": "Debian",
    "Version": "9",
    "Install": "apt",
    "Owner": "SPI",
    "Kernel": "4.9"
    },
    {
    "Name": "Ubuntu",
    "Version": "17.10",
    "Install": "apt",
    "Owner": "Canonical",
    "Kernel": "4.13"
    }
    ]

    from this jason I would like to split in 2 flow file and that flow file contain attribute Name,Version,Install,Owner and kernel

    could you help provide some suggestion.

    ReplyDelete
  13. Hi Matt,
    I wrote a java script custom function to compare two json objects. When i'm trying to call the function in ExecuteScript Processor. I'm getting the below error message:
    org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: SyntaxError: no viable alternative at character '?' in at line number 9 at column number 19


    ----------------------Here is the code ---------------------

    ReplyDelete
  14. I would like to use pandas dataframe in my python script but unable to import it. Any solution to this

    ReplyDelete
  15. మీరు భాగస్వామ్యం చేసే సమాచారం చాలా మంచిది మరియు ఆసక్తికరమైనది. నేను ఈ కథనాన్ని చదవడానికి అదృష్టవంతుడు

    Phối chó bull pháp

    Phối giống chó Corgi

    Phối chó Pug

    Dịch vụ phối giống chó Poodle

    Dịch vụ phối giống chó bull pháp

    ReplyDelete
  16. Vanskeligheter( van bi ) vil passere. På samme måte som( van điện từ ) regnet utenfor( van giảm áp ) vinduet, hvor nostalgisk( van xả khí ) er det som til slutt( van cửa ) vil fjerne( van công nghiệp ) himmelen.

    ReplyDelete
  17. http://www.google.sr/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
    http://www.google.ad/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
    http://www.google.com.bh/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
    http://www.google.com.bo/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
    http://www.google.co.bw/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html

    ReplyDelete



  18. Soma pill is very effective as a painkiller that helps us to get effective relief from pain. This cannot cure pain. Yet when it is taken with proper rest, it can offer you effective relief from pain.
    This painkiller can offer you relief from any kind of pain. But Soma 350 mg is best in treating acute pain. Acute pain is a type of short-term pain which is sharp in nature. Buy Soma 350 mg online to get relief from your acute pain.

    https://globalonlinepills.com/product/soma-350-mg/


    Buy Soma 350 mg
    Soma Pill
    Buy Soma 350 mg online



    Buy Soma 350 mg online
    Soma Pill
    Buy Soma 350 mg

    ReplyDelete
  19. Thank you so much for this nice information. Hope so many people will get aware of this and useful as well. And please keep update like this.

    Big Data Services

    Data Lake Services

    Advanced Analytics Solutions

    Full Stack Development Services

    ReplyDelete
  20. I like your blog, I read this blog please update more content on python, further check it once at
    bed correspondence
    phd regular

    ReplyDelete
  21. Ucuz, kaliteli ve organik sosyal medya hizmetleri satın almak için Ravje Medyayı tercih edebilir ve sosyal medya hesaplarını hızla büyütebilirsin. Ravje Medya ile sosyal medya hesaplarını organik ve gerçek kişiler ile geliştirebilir, kişisel ya da ticari hesapların için Ravje Medyayı tercih edebilirsin. Ravje Medya internet sitesine giriş yapmak için hemen tıkla: ravje.com

    İnstagram takipçi satın almak için Ravje Medya hizmetlerini tercih edebilir, güvenilir ve gerçek takipçilere Ravje Medya ile ulaşabilirsin. İnstagram takipçi satın almak artık Ravje Medya ile oldukça güvenilir. Hemen instagram takipçi satın almak için Ravje Medyanın ilgili sayfasını ziyaret et: instagram takipçi satın al

    Tiktok takipçi satın al istiyorsan tercihini Ravje Medya yap! Ravje Medya uzman kadrosu ve profesyonel ekibi ile sizlere Tiktok takipçi satın alma hizmetide sunmaktadır. Tiktok takipçi satın almak için hemen tıkla: tiktok takipçi satın al

    İnstagram beğeni satın almak için Ravje medya instagram beğeni satın al sayfasına giriş yap, hızlı ve kaliteli instagram beğeni satın al: instagram beğeni satın al

    Youtube izlenme satın al sayfası ile hemen youtube izlenme satın al! Ravje medya kalitesi ile hemen youtube izlenme satın almak için tıklayın: youtube izlenme satın al

    Twitter takipçi satın almak istiyorsan Ravje medya twitter takipçi satın al sayfasına tıkla, Ravje medya güvencesi ile organik twitter takipçi satın al: twitter takipçi satın al

    ReplyDelete