Differences
This shows you the differences between two versions of the page.
Both sides previous revisionPrevious revision | Next revisionBoth sides next revision | ||
kb:apache_nifi [2018/05/06 13:57] – [Create Processor] yehuda | kb:apache_nifi [2018/05/06 15:21] – yehuda | ||
---|---|---|---|
Line 24: | Line 24: | ||
@author: yehuda | @author: yehuda | ||
''' | ''' | ||
+ | from org.apache.commons.io import IOUtils | ||
+ | from java.nio.charset import StandardCharsets | ||
+ | from org.apache.nifi.processor.io import OutputStreamCallback, | ||
import json | import json | ||
- | from pprint import pprint | ||
- | if __name__ == ' | + | |
- | c = "abc , abc2" | + | class InputStreamCB(InputStreamCallback): |
- | + | | |
- | collect_to_attach = map(lambda x: x.strip() ,c) | + | self.items = [] |
- | | + | pass |
- | + | ||
- | with open(' | + | def process(self, |
- | d = json.load(f) | + | text = IOUtils.toString(inputStream, |
+ | d = json.loads(text) | ||
+ | |||
+ | collection_name = " | ||
+ | | ||
+ | |||
+ | collect_to_attach = map(lambda x: x.strip(), c) | ||
attach_data = {} | attach_data = {} | ||
- | | + | |
for aitem in collect_to_attach: | for aitem in collect_to_attach: | ||
if aitem in d: | if aitem in d: | ||
attach_data[aitem] = d[aitem] | attach_data[aitem] = d[aitem] | ||
- | + | ||
- | | + | |
col_name_parts = collection_name.split(" | col_name_parts = collection_name.split(" | ||
- | | + | |
# | # | ||
- | # Filter | + | # Filter |
# | # | ||
selected_obj = d | selected_obj = d | ||
Line 51: | Line 59: | ||
if col_name_part in selected_obj: | if col_name_part in selected_obj: | ||
selected_obj = selected_obj[col_name_part] | selected_obj = selected_obj[col_name_part] | ||
- | + | ||
# | # | ||
# Enreach item | # Enreach item | ||
- | # | + | # |
- | out = [] | + | out = [] |
for item in selected_obj: | for item in selected_obj: | ||
m = dict() | m = dict() | ||
Line 62: | Line 70: | ||
out.append(m) | out.append(m) | ||
# SAVE AS NEW FLOW FILE | # SAVE AS NEW FLOW FILE | ||
- | | ||
- | | ||
- | pprint(json.dumps(out)) | ||
- | | ||
- | pass | ||
- | </ | ||
- | <code python> | + | self.items = out |
- | from org.apache.commons.io import IOUtils | + | |
- | from java.nio.charset import StandardCharsets | + | |
- | from org.apache.nifi.processor.io import StreamCallback | + | |
- | class PyStreamCallback(InputStreamCallback): | ||
- | def __init__(self): | ||
- | pass | ||
- | | ||
- | def process(self, | ||
- | text = IOUtils.toString(inputStream, | ||
- | # outputStream.write(bytearray(' | ||
- | # | ||
- | | ||
def getItems(self): | def getItems(self): | ||
- | return | + | return |
- | class PyOutputStreamCallback(OutputStreamCallback): | + | class OutputStreamCB(OutputStreamCallback): |
- | def __init__(self): | + | def __init__(self): |
pass | pass | ||
- | | + | |
- | outputStream.write(bytearray('Hello World!' | + | def setItem(self, |
- | + | self.item = i | |
- | flowFile | + | |
- | if (flowFile | + | |
- | try: | + | |
- | + | | |
- | | + | |
- | session.read(flowFile, isCB) | + | |
- | + | ff = session.get() | |
- | for itm in isCB.getItems(): | + | |
- | itm_ff = session.create(flowFile) | + | if ff != None: |
- | + | ||
- | outCB = PyOutputStreamCallback() | + | isCB = InputStreamCB() |
- | outCB.setItem(itm) | + | session.read(ff, isCB) |
- | + | | |
- | | + | |
- | session.transfer(itm_ff, | + | |
- | + | | |
- | | + | itm_ff = session.create(ff) |
- | | + | |
+ | outCB = OutputStreamCB() | ||
+ | outCB.setItem(itm) | ||
+ | itm_ff = session.write(itm_ff, | ||
+ | |||
+ | | ||
+ | |||
+ | session.transfer(ff, REL_FAILURE) | ||
</ | </ |