Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revisionPrevious revision
Next revision
Previous revision
Next revisionBoth sides next revision
kb:apache_nifi [2018/05/06 13:25] – [Create Processor] yehudakb:apache_nifi [2018/05/06 15:21] yehuda
Line 24: Line 24:
 @author: yehuda @author: yehuda
 ''' '''
 +from org.apache.commons.io import IOUtils
 +from java.nio.charset import StandardCharsets
 +from org.apache.nifi.processor.io import OutputStreamCallback, InputStreamCallback
 import json import json
-from pprint import pprint 
  
-if __name__ == '__main__'+ 
-    c = "abc abc2".split(","+class InputStreamCB(InputStreamCallback)
-     +    def __init__(self): 
-    collect_to_attach = map(lambda x: x.strip() ,c)     +        self.items = [] 
-    collection_name = "t.items" +        pass 
-     + 
-    with open('data.json') as f: +    def process(self, inputStream): 
-        d = json.load(f)+        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 
 +        d = json.loads(text) 
 + 
 +        collection_name = "messages" 
 +        c = "source batch".split(","
 + 
 +        collect_to_attach = map(lambda x: x.strip(), c) 
         attach_data = {}         attach_data = {}
-        +
         for aitem in collect_to_attach:         for aitem in collect_to_attach:
             if aitem in d:             if aitem in d:
                 attach_data[aitem] = d[aitem]                 attach_data[aitem] = d[aitem]
-         +
-        +
         col_name_parts = collection_name.split(".")         col_name_parts = collection_name.split(".")
-        +
         #         #
-        # Filter +        # Filter
         #         #
         selected_obj = d         selected_obj = d
Line 51: Line 59:
             if col_name_part in selected_obj:             if col_name_part in selected_obj:
                 selected_obj = selected_obj[col_name_part]                 selected_obj = selected_obj[col_name_part]
-                 +
         #         #
         # Enreach item         # Enreach item
-        #      +        # 
-        out = []   +        out = []
         for item in selected_obj:         for item in selected_obj:
             m = dict()             m = dict()
Line 62: Line 70:
             out.append(m)             out.append(m)
             # SAVE AS NEW FLOW FILE             # SAVE AS NEW FLOW FILE
-         + 
-         +        self.items = out 
-        pprint(json.dumps(out)) + 
-         +    def getItems(self): 
-    pass+        return self.items 
 + 
 + 
 +class OutputStreamCB(OutputStreamCallback): 
 +    def __init__(self): 
 +        pass 
 + 
 +    def setItem(self,i): 
 +        self.item = i 
 + 
 +    def process(self, outputStream): 
 +        str_out = json.dumps(self.item) 
 +        outputStream.write(bytearray(str_out.encode('utf-8'))) 
 + 
 + 
 +ff = session.get() 
 + 
 +if ff != None: 
 + 
 +    isCB = InputStreamCB() 
 +    session.read(ff, isCB) 
 +    counter = 1 
 + 
 + 
 +    for itm in isCB.getItems(): 
 +        itm_ff = session.create(ff) 
 + 
 +        outCB = OutputStreamCB() 
 +        outCB.setItem(itm) 
 +        itm_ff = session.write(itm_ff, outCB) 
 + 
 +        session.transfer(itm_ff, REL_SUCCESS) 
 + 
 +    session.transfer(ff, REL_FAILURE)
 </code> </code>
kb/apache_nifi.txt · Last modified: 2022/01/03 16:03 by 127.0.0.1
Back to top
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0