Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revisionPrevious revision
Next revision
Previous revision
kb:apache_nifi [2018/05/06 11:33] – [Create Processor] yehudakb:apache_nifi [2022/01/03 16:03] (current) – external edit 127.0.0.1
Line 1: Line 1:
 ====== Apache NiFi ====== ====== Apache NiFi ======
 +[[https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html|Script processor JYthon NIFI]]
 +Community: [[http://webchat.freenode.net/?channels=#nifi|irc]] [[https://www.hipchat.com/gzh2m5YML|hipchat]]
  
 +====== KB ======
 +<code>
 +-- cursh handling
 +[4:26 PM] Joe Witt: set that to
 +[4:26 PM] Joe Witt: nifi.flowcontroller.autoResumeState=false
 +[4:26 PM] Joe Witt: then you can fix/change whatever is going into a hung thread state mode
 +[4:27 PM] Joe Witt: in latest release (or maybe in the next release?) you can kill hung processors too
 +[4:27 PM] Joe Witt: that wil let you fix things and restart them as well
 +[4:27 PM] Joe Witt: that is new or about to be released
 +</code>
 ====== Create Processor ====== ====== Create Processor ======
 <code bash> <code bash>
Line 16: Line 28:
 https://github.com/alefbt/NiFi-Rule-engine-processor https://github.com/alefbt/NiFi-Rule-engine-processor
  
 +executescript-cookbook: [[https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html|part-1]], [[https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html|part-2]], [[https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html|part-3]]
  
-https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html+<code python> 
 +''' 
 +Created on May 6, 2018 
 + 
 +@authoryehuda 
 +''' 
 +from org.apache.commons.io import IOUtils 
 +from java.nio.charset import StandardCharsets 
 +from org.apache.nifi.processor.io import OutputStreamCallback, InputStreamCallback 
 +import json 
 + 
 + 
 +class InputStreamCB(InputStreamCallback): 
 +    def __init__(self): 
 +        self.items = [] 
 +        pass 
 + 
 +    def process(self, inputStream): 
 +        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 
 +        d = json.loads(text) 
 + 
 +        collection_name = "messages" 
 +        c = "source , batch".split(","
 + 
 +        collect_to_attach = map(lambda x: x.strip(), c) 
 + 
 +        attach_data = {} 
 + 
 +        for aitem in collect_to_attach: 
 +            if aitem in d: 
 +                attach_data[aitem] = d[aitem] 
 + 
 +        col_name_parts = collection_name.split("."
 + 
 +        # 
 +        # Filter 
 +        # 
 +        selected_obj = d 
 +        for col_name_part in col_name_parts: 
 +            if col_name_part in selected_obj: 
 +                selected_obj = selected_obj[col_name_part] 
 + 
 +        # 
 +        # Enreach item 
 +        # 
 +        out = [] 
 +        for item in selected_obj: 
 +            m = dict() 
 +            m.update(item) 
 +            m.update(attach_data) 
 +            out.append(m) 
 +            # SAVE AS NEW FLOW FILE 
 + 
 +        self.items = out 
 + 
 +    def getItems(self): 
 +        return self.items 
 + 
 + 
 +class OutputStreamCB(OutputStreamCallback): 
 +    def __init__(self): 
 +        pass 
 + 
 +    def setItem(self,i): 
 +        self.item = i 
 + 
 +    def process(self, outputStream): 
 +        str_out = json.dumps(self.item) 
 +        outputStream.write(bytearray(str_out.encode('utf-8'))) 
 + 
 + 
 +ff = session.get() 
 + 
 +if ff != None: 
 + 
 +    isCB = InputStreamCB() 
 +    session.read(ff, isCB) 
 +    counter = 1 
 + 
 + 
 +    for itm in isCB.getItems(): 
 +        itm_ff = session.create(ff) 
 + 
 +        outCB = OutputStreamCB() 
 +        outCB.setItem(itm) 
 +        itm_ff = session.write(itm_ff, outCB) 
 + 
 +        session.transfer(itm_ff, REL_SUCCESS) 
 + 
 +    session.transfer(ff, REL_FAILURE) 
 +</code>
kb/apache_nifi.1525606426.txt.gz · Last modified: (external edit)
Back to top
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0