Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revisionPrevious revision
Next revision
Previous revision
kb:apache_nifi [2018/02/05 11:53] – [Create Processor] yehudakb:apache_nifi [2022/01/03 16:03] (current) – external edit 127.0.0.1
Line 1: Line 1:
 ====== Apache NiFi ====== ====== Apache NiFi ======
 +[[https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html|Script processor JYthon NIFI]]
 +Community: [[http://webchat.freenode.net/?channels=#nifi|irc]] [[https://www.hipchat.com/gzh2m5YML|hipchat]]
  
 +====== KB ======
 +<code>
 +-- cursh handling
 +[4:26 PM] Joe Witt: set that to
 +[4:26 PM] Joe Witt: nifi.flowcontroller.autoResumeState=false
 +[4:26 PM] Joe Witt: then you can fix/change whatever is going into a hung thread state mode
 +[4:27 PM] Joe Witt: in latest release (or maybe in the next release?) you can kill hung processors too
 +[4:27 PM] Joe Witt: that wil let you fix things and restart them as well
 +[4:27 PM] Joe Witt: that is new or about to be released
 +</code>
 ====== Create Processor ====== ====== Create Processor ======
 <code bash> <code bash>
Line 13: Line 25:
  
 https://nifi.apache.org/developer-guide.html https://nifi.apache.org/developer-guide.html
 +
 +https://github.com/alefbt/NiFi-Rule-engine-processor
 +
 +executescript-cookbook: [[https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html|part-1]], [[https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html|part-2]], [[https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html|part-3]]
 +
 +<code python>
 +'''
 +Created on May 6, 2018
 +
 +@author: yehuda
 +'''
 +from org.apache.commons.io import IOUtils
 +from java.nio.charset import StandardCharsets
 +from org.apache.nifi.processor.io import OutputStreamCallback, InputStreamCallback
 +import json
 +
 +
 +class InputStreamCB(InputStreamCallback):
 +    def __init__(self):
 +        self.items = []
 +        pass
 +
 +    def process(self, inputStream):
 +        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
 +        d = json.loads(text)
 +
 +        collection_name = "messages"
 +        c = "source , batch".split(",")
 +
 +        collect_to_attach = map(lambda x: x.strip(), c)
 +
 +        attach_data = {}
 +
 +        for aitem in collect_to_attach:
 +            if aitem in d:
 +                attach_data[aitem] = d[aitem]
 +
 +        col_name_parts = collection_name.split(".")
 +
 +        #
 +        # Filter
 +        #
 +        selected_obj = d
 +        for col_name_part in col_name_parts:
 +            if col_name_part in selected_obj:
 +                selected_obj = selected_obj[col_name_part]
 +
 +        #
 +        # Enreach item
 +        #
 +        out = []
 +        for item in selected_obj:
 +            m = dict()
 +            m.update(item)
 +            m.update(attach_data)
 +            out.append(m)
 +            # SAVE AS NEW FLOW FILE
 +
 +        self.items = out
 +
 +    def getItems(self):
 +        return self.items
 +
 +
 +class OutputStreamCB(OutputStreamCallback):
 +    def __init__(self):
 +        pass
 +
 +    def setItem(self,i):
 +        self.item = i
 +
 +    def process(self, outputStream):
 +        str_out = json.dumps(self.item)
 +        outputStream.write(bytearray(str_out.encode('utf-8')))
 +
 +
 +ff = session.get()
 +
 +if ff != None:
 +
 +    isCB = InputStreamCB()
 +    session.read(ff, isCB)
 +    counter = 1
 +
 +
 +    for itm in isCB.getItems():
 +        itm_ff = session.create(ff)
 +
 +        outCB = OutputStreamCB()
 +        outCB.setItem(itm)
 +        itm_ff = session.write(itm_ff, outCB)
 +
 +        session.transfer(itm_ff, REL_SUCCESS)
 +
 +    session.transfer(ff, REL_FAILURE)
 +</code>
kb/apache_nifi.1517831617.txt.gz · Last modified: (external edit)
Back to top
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0