Differences
This shows you the differences between two versions of the page.
| Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
| kb:apache_nifi [2018/02/05 11:53] – [Create Processor] yehuda | kb:apache_nifi [2022/01/03 16:03] (current) – external edit 127.0.0.1 | ||
|---|---|---|---|
| Line 1: | Line 1: | ||
| ====== Apache NiFi ====== | ====== Apache NiFi ====== | ||
| + | [[https:// | ||
| + | Community: [[http:// | ||
| + | ====== KB ====== | ||
| + | < | ||
| + | -- cursh handling | ||
| + | [4:26 PM] Joe Witt: set that to | ||
| + | [4:26 PM] Joe Witt: nifi.flowcontroller.autoResumeState=false | ||
| + | [4:26 PM] Joe Witt: then you can fix/change whatever is going into a hung thread state mode | ||
| + | [4:27 PM] Joe Witt: in latest release (or maybe in the next release?) you can kill hung processors too | ||
| + | [4:27 PM] Joe Witt: that wil let you fix things and restart them as well | ||
| + | [4:27 PM] Joe Witt: that is new or about to be released | ||
| + | </ | ||
| ====== Create Processor ====== | ====== Create Processor ====== | ||
| <code bash> | <code bash> | ||
| Line 13: | Line 25: | ||
| https:// | https:// | ||
| + | |||
| + | https:// | ||
| + | |||
| + | executescript-cookbook: | ||
| + | |||
| + | <code python> | ||
| + | ''' | ||
| + | Created on May 6, 2018 | ||
| + | |||
| + | @author: yehuda | ||
| + | ''' | ||
| + | from org.apache.commons.io import IOUtils | ||
| + | from java.nio.charset import StandardCharsets | ||
| + | from org.apache.nifi.processor.io import OutputStreamCallback, | ||
| + | import json | ||
| + | |||
| + | |||
| + | class InputStreamCB(InputStreamCallback): | ||
| + | def __init__(self): | ||
| + | self.items = [] | ||
| + | pass | ||
| + | |||
| + | def process(self, | ||
| + | text = IOUtils.toString(inputStream, | ||
| + | d = json.loads(text) | ||
| + | |||
| + | collection_name = " | ||
| + | c = " | ||
| + | |||
| + | collect_to_attach = map(lambda x: x.strip(), c) | ||
| + | |||
| + | attach_data = {} | ||
| + | |||
| + | for aitem in collect_to_attach: | ||
| + | if aitem in d: | ||
| + | attach_data[aitem] = d[aitem] | ||
| + | |||
| + | col_name_parts = collection_name.split(" | ||
| + | |||
| + | # | ||
| + | # Filter | ||
| + | # | ||
| + | selected_obj = d | ||
| + | for col_name_part in col_name_parts: | ||
| + | if col_name_part in selected_obj: | ||
| + | selected_obj = selected_obj[col_name_part] | ||
| + | |||
| + | # | ||
| + | # Enreach item | ||
| + | # | ||
| + | out = [] | ||
| + | for item in selected_obj: | ||
| + | m = dict() | ||
| + | m.update(item) | ||
| + | m.update(attach_data) | ||
| + | out.append(m) | ||
| + | # SAVE AS NEW FLOW FILE | ||
| + | |||
| + | self.items = out | ||
| + | |||
| + | def getItems(self): | ||
| + | return self.items | ||
| + | |||
| + | |||
| + | class OutputStreamCB(OutputStreamCallback): | ||
| + | def __init__(self): | ||
| + | pass | ||
| + | |||
| + | def setItem(self, | ||
| + | self.item = i | ||
| + | |||
| + | def process(self, | ||
| + | str_out = json.dumps(self.item) | ||
| + | outputStream.write(bytearray(str_out.encode(' | ||
| + | |||
| + | |||
| + | ff = session.get() | ||
| + | |||
| + | if ff != None: | ||
| + | |||
| + | isCB = InputStreamCB() | ||
| + | session.read(ff, | ||
| + | counter = 1 | ||
| + | |||
| + | |||
| + | for itm in isCB.getItems(): | ||
| + | itm_ff = session.create(ff) | ||
| + | |||
| + | outCB = OutputStreamCB() | ||
| + | outCB.setItem(itm) | ||
| + | itm_ff = session.write(itm_ff, | ||
| + | |||
| + | session.transfer(itm_ff, | ||
| + | |||
| + | session.transfer(ff, | ||
| + | </ | ||