Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
kb:apache_nifi [2018/05/06 13:57]
yehuda [Create Processor]
kb:apache_nifi [2018/05/06 15:21] (current)
yehuda
Line 24: Line 24:
 @author: yehuda @author: yehuda
 '''​ '''​
 +from org.apache.commons.io import IOUtils
 +from java.nio.charset import StandardCharsets
 +from org.apache.nifi.processor.io import OutputStreamCallback,​ InputStreamCallback
 import json import json
-from pprint import pprint 
  
-if __name__ == '​__main__'​+ 
-    c = "abc abc2"​.split(","​) +class InputStreamCB(InputStreamCallback)
-     +    ​def __init__(self):​ 
-    collect_to_attach = map(lambda x: x.strip() ,c)     +        self.items = [] 
-    ​collection_name = "​t.items"​ +        pass 
-     + 
-    with open('​data.json'​) as f: +    def process(self,​ inputStream):​ 
-        d = json.load(f)+        text = IOUtils.toString(inputStream,​ StandardCharsets.UTF_8) 
 +        d = json.loads(text) 
 + 
 +        collection_name = "​messages"​ 
 +        ​c = "source ​batch"​.split(","​) 
 + 
 +        collect_to_attach = map(lambda x: x.strip(), c) 
         attach_data = {}         attach_data = {}
-        ​+
         for aitem in collect_to_attach:​         for aitem in collect_to_attach:​
             if aitem in d:             if aitem in d:
                 attach_data[aitem] = d[aitem]                 attach_data[aitem] = d[aitem]
-         +
-        ​+
         col_name_parts = collection_name.split("​."​)         col_name_parts = collection_name.split("​."​)
-        ​+
         #         #
-        # Filter ​+        # Filter
         #         #
         selected_obj = d         selected_obj = d
Line 51: Line 59:
             if col_name_part in selected_obj:​             if col_name_part in selected_obj:​
                 selected_obj = selected_obj[col_name_part]                 selected_obj = selected_obj[col_name_part]
-                 +
         #         #
         # Enreach item         # Enreach item
-        #     ​ +        # 
-        out = []   ​+        out = []
         for item in selected_obj:​         for item in selected_obj:​
             m = dict()             m = dict()
Line 62: Line 70:
             out.append(m)             out.append(m)
             # SAVE AS NEW FLOW FILE             # SAVE AS NEW FLOW FILE
-        ​ 
-        ​ 
-        pprint(json.dumps(out)) 
-        ​ 
-    pass 
-</​code>​ 
  
-<code python>​ +        self.items = out
-from org.apache.commons.io import IOUtils +
-from java.nio.charset import StandardCharsets +
-from org.apache.nifi.processor.io import StreamCallback+
  
-class PyStreamCallback(InputStreamCallback):​ 
-    def __init__(self):​ 
-        pass 
-    ​ 
-    def process(self,​ inputStream):​ 
-        text = IOUtils.toString(inputStream,​ StandardCharsets.UTF_8) 
-        # outputStream.write(bytearray('​Hello World!'​[::​-1].encode('​utf-8'​))) 
-        #     ​outputStream.write(bytearray('​Hello World!'​.encode('​utf-8'​))) 
-        ​ 
     def getItems(self):​     def getItems(self):​
-        return ​[{"​a":"​b"​}]+        return ​self.items
  
  
-class PyOutputStreamCallback(OutputStreamCallback):​ +class OutputStreamCB(OutputStreamCallback):​ 
-  def __init__(self):​+    def __init__(self):​
         pass         pass
-  ​def process(self,​ outputStream):​ + 
-    outputStream.write(bytearray('Hello World!'​.encode('​utf-8'​))) +    def setItem(self,​i):​ 
-     +        self.item = i 
-flowFile ​= session.get()  + 
-if (flowFile ​!= None)+    ​def process(self,​ outputStream):​ 
-    try: +        ​str_out = json.dumps(self.item) 
-         +        ​outputStream.write(bytearray(str_out.encode('​utf-8'​))) 
-        ​isCB = PyInputStreamCallback() + 
-        session.read(flowFile, isCB) + 
-         +ff = session.get() 
-        for itm in isCB.getItems():​ + 
-            itm_ff = session.create(flowFile+if ff != None: 
-             + 
-            outCB = PyOutputStreamCallback() +    isCB = InputStreamCB() 
-            outCB.setItem(itm) +    session.read(ff, isCB) 
-             +    ​counter = 1 
-            ​itm_ff = session.write(itm_ff,​ outCB) + 
-            session.transfer(itm_ff,​ REL_SUCCESS) + 
-         +    ​for itm in isCB.getItems():​ 
-    ​except: +        itm_ff = session.create(ff
-        ​session.transfer(flowFile, REL_FAILURE)+ 
 +        outCB = OutputStreamCB() 
 +        outCB.setItem(itm) 
 +        itm_ff = session.write(itm_ff,​ outCB) 
 + 
 +        ​session.transfer(itm_ff,​ REL_SUCCESS) 
 + 
 +    session.transfer(ff, REL_FAILURE)
 </​code>​ </​code>​
kb/apache_nifi.txt · Last modified: 2018/05/06 15:21 by yehuda
Back to top
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0