====== Apache NiFi ======
[[https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html|Script processor JYthon NIFI]]
Community: [[http://webchat.freenode.net/?channels=#nifi|irc]] [[https://www.hipchat.com/gzh2m5YML|hipchat]]
====== KB ======
-- cursh handling
[4:26 PM] Joe Witt: set that to
[4:26 PM] Joe Witt: nifi.flowcontroller.autoResumeState=false
[4:26 PM] Joe Witt: then you can fix/change whatever is going into a hung thread state mode
[4:27 PM] Joe Witt: in latest release (or maybe in the next release?) you can kill hung processors too
[4:27 PM] Joe Witt: that wil let you fix things and restart them as well
[4:27 PM] Joe Witt: that is new or about to be released
====== Create Processor ======
mvn archetype:generate \
-DarchetypeGroupId=org.apache.nifi \
-DarchetypeArtifactId=nifi-processor-bundle-archetype \
-DarchetypeVersion=1.5.0 -DnifiVersion=1.2.0
https://www.youtube.com/watch?v=3ldmNFlelhw
https://nifi.apache.org/developer-guide.html
https://github.com/alefbt/NiFi-Rule-engine-processor
executescript-cookbook: [[https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html|part-1]], [[https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html|part-2]], [[https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html|part-3]]
'''
Created on May 6, 2018
@author: yehuda
'''
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback, InputStreamCallback
import json
class InputStreamCB(InputStreamCallback):
def __init__(self):
self.items = []
pass
def process(self, inputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
d = json.loads(text)
collection_name = "messages"
c = "source , batch".split(",")
collect_to_attach = map(lambda x: x.strip(), c)
attach_data = {}
for aitem in collect_to_attach:
if aitem in d:
attach_data[aitem] = d[aitem]
col_name_parts = collection_name.split(".")
#
# Filter
#
selected_obj = d
for col_name_part in col_name_parts:
if col_name_part in selected_obj:
selected_obj = selected_obj[col_name_part]
#
# Enreach item
#
out = []
for item in selected_obj:
m = dict()
m.update(item)
m.update(attach_data)
out.append(m)
# SAVE AS NEW FLOW FILE
self.items = out
def getItems(self):
return self.items
class OutputStreamCB(OutputStreamCallback):
def __init__(self):
pass
def setItem(self,i):
self.item = i
def process(self, outputStream):
str_out = json.dumps(self.item)
outputStream.write(bytearray(str_out.encode('utf-8')))
ff = session.get()
if ff != None:
isCB = InputStreamCB()
session.read(ff, isCB)
counter = 1
for itm in isCB.getItems():
itm_ff = session.create(ff)
outCB = OutputStreamCB()
outCB.setItem(itm)
itm_ff = session.write(itm_ff, outCB)
session.transfer(itm_ff, REL_SUCCESS)
session.transfer(ff, REL_FAILURE)