Differences
This shows you the differences between two versions of the page.
Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
kb:apache_nifi [2018/02/05 11:53] – [Create Processor] yehuda | kb:apache_nifi [2022/01/03 16:03] (current) – external edit 127.0.0.1 | ||
---|---|---|---|
Line 1: | Line 1: | ||
====== Apache NiFi ====== | ====== Apache NiFi ====== | ||
+ | [[https:// | ||
+ | Community: [[http:// | ||
+ | ====== KB ====== | ||
+ | < | ||
+ | -- cursh handling | ||
+ | [4:26 PM] Joe Witt: set that to | ||
+ | [4:26 PM] Joe Witt: nifi.flowcontroller.autoResumeState=false | ||
+ | [4:26 PM] Joe Witt: then you can fix/change whatever is going into a hung thread state mode | ||
+ | [4:27 PM] Joe Witt: in latest release (or maybe in the next release?) you can kill hung processors too | ||
+ | [4:27 PM] Joe Witt: that wil let you fix things and restart them as well | ||
+ | [4:27 PM] Joe Witt: that is new or about to be released | ||
+ | </ | ||
====== Create Processor ====== | ====== Create Processor ====== | ||
<code bash> | <code bash> | ||
Line 13: | Line 25: | ||
https:// | https:// | ||
+ | |||
+ | https:// | ||
+ | |||
+ | executescript-cookbook: | ||
+ | |||
+ | <code python> | ||
+ | ''' | ||
+ | Created on May 6, 2018 | ||
+ | |||
+ | @author: yehuda | ||
+ | ''' | ||
+ | from org.apache.commons.io import IOUtils | ||
+ | from java.nio.charset import StandardCharsets | ||
+ | from org.apache.nifi.processor.io import OutputStreamCallback, | ||
+ | import json | ||
+ | |||
+ | |||
+ | class InputStreamCB(InputStreamCallback): | ||
+ | def __init__(self): | ||
+ | self.items = [] | ||
+ | pass | ||
+ | |||
+ | def process(self, | ||
+ | text = IOUtils.toString(inputStream, | ||
+ | d = json.loads(text) | ||
+ | |||
+ | collection_name = " | ||
+ | c = " | ||
+ | |||
+ | collect_to_attach = map(lambda x: x.strip(), c) | ||
+ | |||
+ | attach_data = {} | ||
+ | |||
+ | for aitem in collect_to_attach: | ||
+ | if aitem in d: | ||
+ | attach_data[aitem] = d[aitem] | ||
+ | |||
+ | col_name_parts = collection_name.split(" | ||
+ | |||
+ | # | ||
+ | # Filter | ||
+ | # | ||
+ | selected_obj = d | ||
+ | for col_name_part in col_name_parts: | ||
+ | if col_name_part in selected_obj: | ||
+ | selected_obj = selected_obj[col_name_part] | ||
+ | |||
+ | # | ||
+ | # Enreach item | ||
+ | # | ||
+ | out = [] | ||
+ | for item in selected_obj: | ||
+ | m = dict() | ||
+ | m.update(item) | ||
+ | m.update(attach_data) | ||
+ | out.append(m) | ||
+ | # SAVE AS NEW FLOW FILE | ||
+ | |||
+ | self.items = out | ||
+ | |||
+ | def getItems(self): | ||
+ | return self.items | ||
+ | |||
+ | |||
+ | class OutputStreamCB(OutputStreamCallback): | ||
+ | def __init__(self): | ||
+ | pass | ||
+ | |||
+ | def setItem(self, | ||
+ | self.item = i | ||
+ | |||
+ | def process(self, | ||
+ | str_out = json.dumps(self.item) | ||
+ | outputStream.write(bytearray(str_out.encode(' | ||
+ | |||
+ | |||
+ | ff = session.get() | ||
+ | |||
+ | if ff != None: | ||
+ | |||
+ | isCB = InputStreamCB() | ||
+ | session.read(ff, | ||
+ | counter = 1 | ||
+ | |||
+ | |||
+ | for itm in isCB.getItems(): | ||
+ | itm_ff = session.create(ff) | ||
+ | |||
+ | outCB = OutputStreamCB() | ||
+ | outCB.setItem(itm) | ||
+ | itm_ff = session.write(itm_ff, | ||
+ | |||
+ | session.transfer(itm_ff, | ||
+ | |||
+ | session.transfer(ff, | ||
+ | </ |