This is an old revision of the document!


Apache NiFi

Create Processor

mvn archetype:generate \
-DarchetypeGroupId=org.apache.nifi \
-DarchetypeArtifactId=nifi-processor-bundle-archetype \
-DarchetypeVersion=1.5.0 -DnifiVersion=1.2.0

https://www.youtube.com/watch?v=3ldmNFlelhw

https://nifi.apache.org/developer-guide.html

https://github.com/alefbt/NiFi-Rule-engine-processor

executescript-cookbook: part-1, part-2, part-3

'''
Created on May 6, 2018
 
@author: yehuda
'''
import json
from pprint import pprint
 
if __name__ == '__main__':
    c = "abc , abc2".split(",")
 
    collect_to_attach = map(lambda x: x.strip() ,c)    
    collection_name = "t.items"
 
    with open('data.json') as f:
        d = json.load(f)
        attach_data = {}
 
        for aitem in collect_to_attach:
            if aitem in d:
                attach_data[aitem] = d[aitem]
 
 
        col_name_parts = collection_name.split(".")
 
        #
        # Filter 
        #
        selected_obj = d
        for col_name_part in col_name_parts:
            if col_name_part in selected_obj:
                selected_obj = selected_obj[col_name_part]
 
        #
        # Enreach item
        #     
        out = []   
        for item in selected_obj:
            m = dict()
            m.update(item)
            m.update(attach_data)
            out.append(m)
            # SAVE AS NEW FLOW FILE
 
 
        pprint(json.dumps(out))
 
    pass
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
 
class PyStreamCallback(InputStreamCallback):
    def __init__(self):
        pass
 
    def process(self, inputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        # outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8')))
        #     outputStream.write(bytearray('Hello World!'.encode('utf-8')))
 
    def getItems(self):
        return [{"a":"b"}]
 
 
class PyOutputStreamCallback(OutputStreamCallback):
  def __init__(self):
        pass
  def process(self, outputStream):
    outputStream.write(bytearray('Hello World!'.encode('utf-8')))
 
flowFile = session.get() 
if (flowFile != None):
    try:
 
        isCB = PyInputStreamCallback()
        session.read(flowFile, isCB)
 
        for itm in isCB.getItems():
            itm_ff = session.create(flowFile)
 
            outCB = PyOutputStreamCallback()
            outCB.setItem(itm)
 
            itm_ff = session.write(itm_ff, outCB)
            session.transfer(itm_ff, REL_SUCCESS)
 
    except:
        session.transfer(flowFile, REL_FAILURE)
kb/apache_nifi.1525615065.txt.gz · Last modified: 2022/01/03 16:03 (external edit)
Back to top
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0