Since then I've received some requests for a Jython example for ExecuteScript, so I figured I'd do the same use case again (JSON to JSON) so folks can see the differences in the languages when performing the same operations :)
The approach has been covered in detail in the other posts, so I will talk a bit about the Jython-specific stuff and then get right to the code.
One major caveat here is that I don't know Python :) I learned enough to get the script working, but please let me know how to better do any of this. I stopped touching the script once it worked, so it's very possible there are unnecessary imports, classes, etc.
The first thing to do is to bring in the Jython libraries you will need, as well as importing the Java and NiFi classes to be used:
import json import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback
I didn't need to import java.lang.String as Jython does type coercion. I probably couldn't call getBytes() on that string unless Jython knew to coerce the object to a Java String, but that's ok because we can call bytearray("myString".encode('utf-8')) to achieve the same results.
The next task was to create a StreamCallback object for use in session.write(). I created a Jython class for this and overrode the interface method:
class PyStreamCallback(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): # ...
After that, I read in and parsed the JSON text with IOUtils then json.loads(), then performed all the operations on the various parts of the object/dictionary. Finally I generated a new JSON string with json.dumps(), encoded to UTF-8, got the byte array, and wrote it to the output stream.
The resulting script is as follows:
import json import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class PyStreamCallback(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) obj = json.loads(text) newObj = { "Range": 5, "Rating": obj['rating']['primary']['value'], "SecondaryRatings": {} } for key, value in obj['rating'].iteritems(): if key != "primary": newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']} outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) flowFile = session.get() if (flowFile != None): flowFile = session.write(flowFile,PyStreamCallback()) flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json') session.transfer(flowFile, REL_SUCCESS)
The template is available as a Gist (here). I welcome all suggestions on how to make this better, and please share any scripts you come up with!
Cheers!
I modified your template to convert timestamps in JSON from a String to a Long:
ReplyDeletehttps://gist.github.com/ryanpersaud/734b68e3624d06433deaa114acc33865
When you make this call to the callback
ReplyDeleteflowFile = session.write(flowFile,PyStreamCallback())
can I treat inputStream as I would any file pointer - in other words, read in my records using csv.DictReader(inputStream, fieldnames) , for example? It would be helpful if I could do that rather than use a list iterator to iterate through the records.
Thanks in advance for any comments.
Not sure, you might need FileUtil to wrap() the input stream: http://www.jython.org/javadoc/org/python/core/util/FileUtil.html
ReplyDeleteI'd like to append Attributes for a flowFile in my processing workflow to a disk file I use as a log. I need to create the output file on the first access attempt if it is not there, but if it exists I want to append to the contents that are already there.
ReplyDeleteThis script above is very helpful, and shows me how I can customize the Attributes of the flowFile out of this ExecuteScript processor. But the putFile processor that might follow only affords us the option to create a new output file each time. Is there an example that would demonstrate how to append the flowFile attribute information to a log file each time without losing the previous contents of the output file?
Thanks again.
This will add current date as a string:
Deleteimport json
import java.io
from datetime import datetime
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
flowFileJson = json.loads(text)
today_sting = datetime.today().strftime('%Y-%m-%d')
flowFileJson['current_date'] = today_sting
outputStream.write(bytearray(json.dumps(flowFileJson, indent=4).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
If you want to add this string to every json object in json array replace
flowFileJson['current_date'] = today_sting
with
for j in flowFileJson:
j['current_date'] = today_sting
I am trying to do something similar here, but I want to encrypt parts of the JSON. I have created some code that does the encryption, but it requires me to import some python modules, mainly the Cipher module. As seen at this link(https://repl.it/EJU7/0). However, nifi cannot recognize the "from Crypto.Cipher import AES" call. Do you know any workarounds for this or any way to fix this?
ReplyDeleteI keep getting the error "No module named Crypto" even though I added the module path "/Library/Python/2.7/site-packages/Crypto/Cipher/AES.pyc" to the module directory section.
ReplyDeleteExecuteScript uses Jython, not Python. This means that it can't import native or compiled Python modules, just pure Python modules. Check the Jython documentation on this, perhaps there is a pure Python library for crypto that you can use instead.
ReplyDeleteHi,
ReplyDeleteI am very new to NIFI.
I am getting error like :
"Failed to process session due to process exception, SyntaxError : no viable alternative character '' in script at line ::: column number 0."
Could somebody guide me.
Code :
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
newObj = {
"Range": 5,
"SecondaryRatings": {}
}
outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
Here I am just sending newObj on any input stream just for POC thing
ReplyDelete0
down vote
favorite
I want to read data directly from Nifi OutPort to Spark streaming using PySpark (without using Kafka in between). I have seen few reference code in Scala, there using "SiteToSiteClient" method to setup connection with Nifi. But I am not able to get any equivalent Python module to develop similar code in Python.
Can anyone please help me out?
Thanks.
Hi,
ReplyDeleteI am very new to NIFI.
I have some case what to know how can I do
So I have jason file contain multiple config but I would like to general flow file from that jason and put all the config to attribute can I use this script or how can I do
[
{
"Name": "Debian",
"Version": "9",
"Install": "apt",
"Owner": "SPI",
"Kernel": "4.9"
},
{
"Name": "Ubuntu",
"Version": "17.10",
"Install": "apt",
"Owner": "Canonical",
"Kernel": "4.13"
}
]
from this jason I would like to split in 2 flow file and that flow file contain attribute Name,Version,Install,Owner and kernel
could you help provide some suggestion.
Hi Matt,
ReplyDeleteI wrote a java script custom function to compare two json objects. When i'm trying to call the function in ExecuteScript Processor. I'm getting the below error message:
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: SyntaxError: no viable alternative at character '?' in at line number 9 at column number 19
----------------------Here is the code ---------------------
I would like to use pandas dataframe in my python script but unable to import it. Any solution to this
ReplyDeleteమీరు భాగస్వామ్యం చేసే సమాచారం చాలా మంచిది మరియు ఆసక్తికరమైనది. నేను ఈ కథనాన్ని చదవడానికి అదృష్టవంతుడు
ReplyDeletePhối chó bull pháp
Phối giống chó Corgi
Phối chó Pug
Dịch vụ phối giống chó Poodle
Dịch vụ phối giống chó bull pháp
Bu makaleyi gördüğüm için çok şanslıyım. Her zaman mutlu ve mutlu olmanı diliyorum
ReplyDeletegiảo cổ lam giảm cân
giảo cổ lam giảm béo
giảo cổ lam giá bao nhiêu
giảo cổ lam ở đâu tốt nhất
1 článek je příliš zajímavý. Díky za sdílení
ReplyDeletemáy khuếch tán tinh dầu
máy khuếch tán tinh dầu giá rẻ
máy phun tinh dầu
máy khuếch tán tinh dầu tphcm
máy phun sương tinh dầu
Phối chó bull pháp
ReplyDeletePhối giống chó Corgi
Phối chó Pug
chó Poodle
chó Poodle giá bao nhiêu
Du har en bra artikel. Önskar dig en produktiv dag
ReplyDeletebon ngam chan
máy ngâm chân giải độc
bồn mát xa chân
chậu ngâm chân giá rẻ
http://www.google.sr/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
ReplyDeletehttp://www.google.ad/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
http://www.google.com.bh/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
http://www.google.com.bo/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
http://www.google.co.bw/url?q=https://forums.futura-sciences.com/members/1080064-thanhgompaumaieco.html
Superb
ReplyDeleteSAP Training in Chennai
SAP ABAP Training in Chennai
SAP Basis Training in Chennai
SAP FICO Training in Chennai
SAP SD Training in Chennai
SAP MM Training in Chennai
SAP PM Training in Chennai
SAP PP Training in Chennai
SAP MDG Training in Chennai
SAP EHS Training in Chennai
Hello, I want to have one of these piercing models, but I am unsure. Can you look at it?
ReplyDeleteBlack Hole Piercing
Vch Piercing
Bridge Piercing
Tragus Piercing
Snake Bite Piercing
Tragus Piercing & Piercing
Hi,
ReplyDeleteI am very New to NiFi and I am looking for help with data formatting.
Requirement :
I have to extract the data from DB2 tables and load into an CSV file. while doing so the amount fields with data type 10.8 are converting in to scientific format as shown below 0E-8. especially when the values are zero's.. I am looking for solution to see if there any way we can format the data so that it will look like original data from table.
sample Data:
2021-06-30,8,CMS,159771309,Domestic Bank,1,Collateral,Collateral,Collateral,Collateral,4915,DIP PITTSBURGH,Bankruptcy,PUB_FUNDS,0E-8,0E-8,,0E-8,0E-8,,0E-8,0E-8,19872107.26000000