====== Apache NiFi ====== [[https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html|Script processor JYthon NIFI]] Community: [[http://webchat.freenode.net/?channels=#nifi|irc]] [[https://www.hipchat.com/gzh2m5YML|hipchat]] ====== KB ====== -- cursh handling [4:26 PM] Joe Witt: set that to [4:26 PM] Joe Witt: nifi.flowcontroller.autoResumeState=false [4:26 PM] Joe Witt: then you can fix/change whatever is going into a hung thread state mode [4:27 PM] Joe Witt: in latest release (or maybe in the next release?) you can kill hung processors too [4:27 PM] Joe Witt: that wil let you fix things and restart them as well [4:27 PM] Joe Witt: that is new or about to be released ====== Create Processor ====== mvn archetype:generate \ -DarchetypeGroupId=org.apache.nifi \ -DarchetypeArtifactId=nifi-processor-bundle-archetype \ -DarchetypeVersion=1.5.0 -DnifiVersion=1.2.0 https://www.youtube.com/watch?v=3ldmNFlelhw https://nifi.apache.org/developer-guide.html https://github.com/alefbt/NiFi-Rule-engine-processor executescript-cookbook: [[https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html|part-1]], [[https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html|part-2]], [[https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html|part-3]] ''' Created on May 6, 2018 @author: yehuda ''' from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import OutputStreamCallback, InputStreamCallback import json class InputStreamCB(InputStreamCallback): def __init__(self): self.items = [] pass def process(self, inputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) d = json.loads(text) collection_name = "messages" c = "source , batch".split(",") collect_to_attach = map(lambda x: x.strip(), c) attach_data = {} for aitem in collect_to_attach: if aitem in d: attach_data[aitem] = d[aitem] col_name_parts = collection_name.split(".") # # Filter # selected_obj = d for col_name_part in col_name_parts: if col_name_part in selected_obj: selected_obj = selected_obj[col_name_part] # # Enreach item # out = [] for item in selected_obj: m = dict() m.update(item) m.update(attach_data) out.append(m) # SAVE AS NEW FLOW FILE self.items = out def getItems(self): return self.items class OutputStreamCB(OutputStreamCallback): def __init__(self): pass def setItem(self,i): self.item = i def process(self, outputStream): str_out = json.dumps(self.item) outputStream.write(bytearray(str_out.encode('utf-8'))) ff = session.get() if ff != None: isCB = InputStreamCB() session.read(ff, isCB) counter = 1 for itm in isCB.getItems(): itm_ff = session.create(ff) outCB = OutputStreamCB() outCB.setItem(itm) itm_ff = session.write(itm_ff, outCB) session.transfer(itm_ff, REL_SUCCESS) session.transfer(ff, REL_FAILURE)