This is an old revision of the document!
Apache NiFi
Create Processor
mvn archetype:generate \ -DarchetypeGroupId=org.apache.nifi \ -DarchetypeArtifactId=nifi-processor-bundle-archetype \ -DarchetypeVersion=1.5.0 -DnifiVersion=1.2.0
https://www.youtube.com/watch?v=3ldmNFlelhw
https://nifi.apache.org/developer-guide.html
https://github.com/alefbt/NiFi-Rule-engine-processor
executescript-cookbook: part-1, part-2, part-3
''' Created on May 6, 2018 @author: yehuda ''' import json from pprint import pprint if __name__ == '__main__': c = "abc , abc2".split(",") collect_to_attach = map(lambda x: x.strip() ,c) collection_name = "t.items" with open('data.json') as f: d = json.load(f) attach_data = {} for aitem in collect_to_attach: if aitem in d: attach_data[aitem] = d[aitem] col_name_parts = collection_name.split(".") # # Filter # selected_obj = d for col_name_part in col_name_parts: if col_name_part in selected_obj: selected_obj = selected_obj[col_name_part] # # Enreach item # out = [] for item in selected_obj: m = dict() m.update(item) m.update(attach_data) out.append(m) # SAVE AS NEW FLOW FILE pprint(json.dumps(out)) pass
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class PyStreamCallback(InputStreamCallback): def __init__(self): pass def process(self, inputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) # outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8'))) # outputStream.write(bytearray('Hello World!'.encode('utf-8'))) def getItems(self): return [{"a":"b"}] class PyOutputStreamCallback(OutputStreamCallback): def __init__(self): pass def process(self, outputStream): outputStream.write(bytearray('Hello World!'.encode('utf-8'))) flowFile = session.get() if (flowFile != None): try: isCB = PyInputStreamCallback() session.read(flowFile, isCB) for itm in isCB.getItems(): itm_ff = session.create(flowFile) outCB = PyOutputStreamCallback() outCB.setItem(itm) itm_ff = session.write(itm_ff, outCB) session.transfer(itm_ff, REL_SUCCESS) except: session.transfer(flowFile, REL_FAILURE)