Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revisionPrevious revision
Next revisionBoth sides next revision
kb:apache_nifi [2018/05/06 13:57] – [Create Processor] yehudakb:apache_nifi [2018/05/06 15:21] yehuda
Line 24: Line 24:
 @author: yehuda @author: yehuda
 ''' '''
 +from org.apache.commons.io import IOUtils
 +from java.nio.charset import StandardCharsets
 +from org.apache.nifi.processor.io import OutputStreamCallback, InputStreamCallback
 import json import json
-from pprint import pprint 
  
-if __name__ == '__main__'+ 
-    c = "abc abc2".split(","+class InputStreamCB(InputStreamCallback)
-     +    def __init__(self): 
-    collect_to_attach = map(lambda x: x.strip() ,c)     +        self.items = [] 
-    collection_name = "t.items" +        pass 
-     + 
-    with open('data.json') as f: +    def process(self, inputStream): 
-        d = json.load(f)+        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 
 +        d = json.loads(text) 
 + 
 +        collection_name = "messages" 
 +        c = "source batch".split(","
 + 
 +        collect_to_attach = map(lambda x: x.strip(), c) 
         attach_data = {}         attach_data = {}
-        +
         for aitem in collect_to_attach:         for aitem in collect_to_attach:
             if aitem in d:             if aitem in d:
                 attach_data[aitem] = d[aitem]                 attach_data[aitem] = d[aitem]
-         +
-        +
         col_name_parts = collection_name.split(".")         col_name_parts = collection_name.split(".")
-        +
         #         #
-        # Filter +        # Filter
         #         #
         selected_obj = d         selected_obj = d
Line 51: Line 59:
             if col_name_part in selected_obj:             if col_name_part in selected_obj:
                 selected_obj = selected_obj[col_name_part]                 selected_obj = selected_obj[col_name_part]
-                 +
         #         #
         # Enreach item         # Enreach item
-        #      +        # 
-        out = []   +        out = []
         for item in selected_obj:         for item in selected_obj:
             m = dict()             m = dict()
Line 62: Line 70:
             out.append(m)             out.append(m)
             # SAVE AS NEW FLOW FILE             # SAVE AS NEW FLOW FILE
-         
-         
-        pprint(json.dumps(out)) 
-         
-    pass 
-</code> 
  
-<code python> +        self.items = out
-from org.apache.commons.io import IOUtils +
-from java.nio.charset import StandardCharsets +
-from org.apache.nifi.processor.io import StreamCallback+
  
-class PyStreamCallback(InputStreamCallback): 
-    def __init__(self): 
-        pass 
-     
-    def process(self, inputStream): 
-        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 
-        # outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8'))) 
-        #     outputStream.write(bytearray('Hello World!'.encode('utf-8'))) 
-         
     def getItems(self):     def getItems(self):
-        return [{"a":"b"}]+        return self.items
  
  
-class PyOutputStreamCallback(OutputStreamCallback): +class OutputStreamCB(OutputStreamCallback): 
-  def __init__(self):+    def __init__(self):
         pass         pass
-  def process(self, outputStream): + 
-    outputStream.write(bytearray('Hello World!'.encode('utf-8'))) +    def setItem(self,i): 
-     +        self.item = i 
-flowFile = session.get()  + 
-if (flowFile != None)+    def process(self, outputStream): 
-    try: +        str_out = json.dumps(self.item) 
-         +        outputStream.write(bytearray(str_out.encode('utf-8'))) 
-        isCB = PyInputStreamCallback() + 
-        session.read(flowFile, isCB) + 
-         +ff = session.get() 
-        for itm in isCB.getItems(): + 
-            itm_ff = session.create(flowFile+if ff != None: 
-             + 
-            outCB = PyOutputStreamCallback() +    isCB = InputStreamCB() 
-            outCB.setItem(itm) +    session.read(ff, isCB) 
-             +    counter = 1 
-            itm_ff = session.write(itm_ff, outCB) + 
-            session.transfer(itm_ff, REL_SUCCESS) + 
-         +    for itm in isCB.getItems(): 
-    except: +        itm_ff = session.create(ff
-        session.transfer(flowFile, REL_FAILURE)+ 
 +        outCB = OutputStreamCB() 
 +        outCB.setItem(itm) 
 +        itm_ff = session.write(itm_ff, outCB) 
 + 
 +        session.transfer(itm_ff, REL_SUCCESS) 
 + 
 +    session.transfer(ff, REL_FAILURE)
 </code> </code>
kb/apache_nifi.txt · Last modified: 2022/01/03 16:03 by 127.0.0.1
Back to top
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0