Quellcode durchsuchen

big recactor: lib/zchat.py to Poll() ! .. loocks good

micha vor 1 Jahr
Ursprung
Commit
b5de2ed4f3
3 geänderte Dateien mit 272 neuen und 220 gelöschten Zeilen
  1. 48 12
      _LibreLightDesk.py
  2. 27 16
      _console.py
  3. 197 192
      lib/zchat.py

+ 48 - 12
_LibreLightDesk.py

@@ -290,7 +290,7 @@ COLOR = ["RED","GREEN","BLUE","COLOR"]
 BEAM  = ["GOBO","G-ROT","PRISMA","P-ROT","FOCUS","SPEED"]
 INT   = ["DIM","SHUTTER","STROBE","FUNC"]
 #client = chat.tcp_sender(port=50001)
-    
+
 
 def set_exec_fader(nr,val,color=""):
     exec_wing = window_manager.get_obj(name="EXEC-WING") 
@@ -310,7 +310,7 @@ def set_exec_fader_all():
         set_exec_fader(nr,0) #,color="#fff")
 
 # remote input - start (memcached)
-def JCB(x):
+def JCB(x,sock=None):
     for i in x:
         jv = x[i]
 
@@ -325,12 +325,52 @@ def JCB(x):
             set_exec_fader(2,int(v/2+10))
         except Exception as e:
             cprint("exception",e)
+            print(sys.exc_info()[2])
         #print("remote in:",round(time.time(),0),"x",i,v)
 
-#chat.cmd(JCB,port=30002) # SERVER
-thread.start_new_thread(chat.cmd,(JCB,30002)) # SERVER
+r1_server = chat.Server(port=30002)
+def server1_loop():
+    while 1:
+        r1_server.poll(cb=JCB)
+        time.sleep(0.01)
+thread.start_new_thread(server1_loop,()) # SERVER
 # remote input - end
 
+
+def JSCB(x,sock=None):
+    i = ""
+    msg = ""
+    try:
+        for i in x:
+            #print("i",[i])
+            msg = json.loads(i)
+            print("JSCB",i,msg,sock)
+            if sock:
+                msg = json.dumps(msg)
+                msg = bytes(msg,"utf8")
+                chat._send(sock,msg)
+            
+    except Exception as e:
+        cprint("exception JSCB:",e)
+        cprint("- i:",i)
+        cprint("- msg:",msg)
+        cprint(traceback.format_exc(),color="red")
+        if sock:
+            msg = ["Notice: Exception on JSCB-SERVER: ",str(e)]
+            msg = json.dumps(msg)
+            msg = bytes(msg,"utf8")
+            chat._send(sock,msg)
+
+
+
+# external GUI
+r_server = chat.Server(port=30003,cb=JSCB)
+def server_loop():
+    while 1:
+        r_server.poll(cb=JSCB)
+        time.sleep(0.001)
+thread.start_new_thread(server_loop,()) # SERVER
+
 # read memcachd
 memcache = None
 try:
@@ -427,11 +467,9 @@ class MC():
 
 _mc=MC()
 _mc.loop()
-#time.sleep(1)
-#exit()
 
-jclient = chat.tcp_sender()#port=50001)
-import zlib
+console = chat.Client() #port=50001)
+
 def jclient_send(data):
     t_start = time.time()
     jtxt = data
@@ -480,9 +518,7 @@ def jclient_send(data):
     jtxt = jdatas
     jtxt = json.dumps(jtxt)
     jtxt = jtxt.encode()
-    #jtxt = zlib.compress(jtxt)
-    jclient.send( jtxt ) #b"\00 ")
-    #print(round((time.time()-t_start)*1000,4),"milis")
+    console.send( jtxt ) #b"\00 ")
     cprint("{:0.04} sec.".format(time.time()-t_start),color="yellow")
     cprint("{:0.04} tick".format(time.time()),color="yellow")
 
@@ -4017,7 +4053,7 @@ class Window():
                     save_window_position()
                     #self.elem.config(activebackground="lightgrey")
                     LOAD_SHOW_AND_RESTAT("").cb(force=1)
-                cprint("oipo "*10,round(int(time.time()-sstart)*1000,2))
+                #cprint("oipo "*10,round(int(time.time()-sstart)*1000,2))
                 return
 
         if "keysym" in dir(event):

+ 27 - 16
_console.py

@@ -945,25 +945,33 @@ if __run_main:
     thread.start_new_thread(main.loop,())
 
 
-def JCB(data): #json client input
+def JCB(data,sock=None): #json client input
     t_start = time.time()
-    jdatas = [data["cmd"]]
-
+    #print("-->-",data)
+    jdatas = []
+    l2 = 0
+    for line in data:
+        data2 = json.loads(line)
+        l2 += len(data2)
+        #print("line:",line)
+        jdatas.append(data2) #["CMD"])
+
+    print("INPUT JCB =>",len(data),":",l2)
     c = clock.time() 
     c = float(c)
     ftime = 0
     delay = 0
-    for j in jdatas:
+    for cmds in jdatas:
+
+        #print("cmds:",cmds)
         master_fx = MASTER_FX()
-        if not j:
+        if not cmds:
             continue
+
         try:
-            jdata = j #jdatas[j]
-            jtxt = jdata
-            jtxt = str(jtxt,"UTF-8")
-            cmds = json.loads(jtxt)
             out = {}
             for x in cmds:
+
                 if "CMD" in x:
                     print("CMD:",x)
                     if "EXEC-SPEED-MASTER" == x["CMD"]:
@@ -1061,9 +1069,13 @@ def JCB(data): #json client input
                                     i.fx(xtype="off",clock=c)
 
                         if "FLASH" in x:
-                            out[DMX]["flash_fx"] = {"xtype":xtype,"size":size,"speed":speed,"invert":invert,"width":width,"start":start,"offset":offset,"base":base,"clock":c,"master":master_fx}
+                            out[DMX]["flash_fx"] = {"xtype":xtype,"size":size,"speed":speed,
+                                        "invert":invert,"width":width,"start":start
+                                        ,"offset":offset,"base":base,"clock":c,"master":master_fx}
                         else:
-                            out[DMX]["fx"] = {"xtype":xtype,"size":size,"speed":speed,"invert":invert,"width":width,"start":start,"offset":offset,"base":base,"clock":c,"master":master_fx}
+                            out[DMX]["fx"] = {"xtype":xtype,"size":size,"speed":speed
+                                    ,"invert":invert,"width":width,"start":start
+                                    ,"offset":offset,"base":base,"clock":c,"master":master_fx}
 
                     elif type(fx) is str and fx:  # old fx like sinus:200:12:244 
                         ccm = str(DMX+1)+":"+fx
@@ -1228,12 +1240,11 @@ def CB(data): # raw/text client input
 
 if __run_main:
         
-    #jchat = chat.CMD(CB,port=50001) # server listener
-    #thread.start_new_thread(jchat.poll,())
-    chat.cmd(JCB) # server listener
-    #chat.cmd(JCB,port=50001) # server listener
+    s = chat.Server(cb=JCB)
+    while 1:
+        s.poll()
+        time.sleep(0.01)
 
-    #input("END")
 
 
 

+ 197 - 192
lib/zchat.py

@@ -10,7 +10,13 @@ import base64
 import json 
 import time
 
+import traceback
+
 import _thread
+import _thread as thread
+
+
+dbg=0
 
 def dummyCB(msg):
     
@@ -25,18 +31,136 @@ def dummyCB(msg):
     except:pass
     print("d:",round(t1-t,3),"dummy_CB",msg)
 
+def _compress(msg):
+    _msg = msg[:]
+    if sys.getsizeof(_msg):
+        _msg=zlib.compress(_msg)
+    _msg = base64.b64encode(_msg)
+    return _msg + b"\00"
+
+def _decompress(msg):
+    _msg = b""
+    if msg:
+        #print("msg**",msg)
+        _msg = msg[:]
+        if _msg[-1] == b"\00":
+            _msg = _msg[:-1]
+        _msg = base64.b64decode(msg)
+
+        try:
+            _msg=zlib.decompress(_msg)
+        except Exception as e:
+            print("SERVER decompress err",e)
+    
+    return _msg
+
+def _send(sock,msg):
+    try:
+        if dbg:print("_send",[msg])
+        if dbg:print()
+        _msg = _compress(msg)
+        r=sock.send(_msg)
+        return r
+    except Exception as e:
+        print("excep _send",e)
+
+
+def _recv(sock):
+    xmsg=b""
+    msg =b""
 
+    try:
+        xmsg = sock.recv(1)#1024)#5120)
+        if xmsg == b".":
+            xmsg = b""
+        #print(":",xmsg)
+        while xmsg:
+            #print(":::",xmsg)
+            if xmsg == b"\x00":
+                break
+            msg += xmsg
+            xmsg = sock.recv(1)
+            if xmsg == b".":
+                xmsg = b""
+    except ConnectionResetError as e:
+        pass
+    except BlockingIOError as e:
+        pass
+    if msg:
+        #if dbg:print("_recvA",[msg]) 
+        msg = _decompress(msg)
+        if dbg:print("_recvB",[msg]) 
+    return msg
+
+
+
+
+class Poll():
+    def __init__(self,sock,cb=None,name="<name>"):
+        print(name,"Poll.__init__()")
+        self.cb = cb
+        self.name = name
+        self.data_in = []
+        self.data_out = []
+        self.sock = sock
+        self.lock = _thread.allocate_lock()
+        _thread.start_new_thread(self._rloop,())
+        if cb:
+            _thread.start_new_thread(self._wloop,())
+    def _get_out(self):
+        out = []
+        try:
+            self.lock.acquire()
+            out = self.data_out[:]
+            self.data_out = []
+        finally:
+            self.lock.release()
+        return out
+    def _wloop(self):
+        while 1:
+            out = self._get_out()
+            #print(self,"_wloop",out)
+            if out and self.cb:
+                if dbg:print(self.name,"Poll._wloop",out)
+                try:
+                    self.cb(out,self.sock)
+                except Exception as e:
+                    print("Exception self.cb",e)
+                    print(traceback.format_exc())
+            else:
+                time.sleep(0.02)
+
+    def _rloop(self):
+        msg = b""
+        while 1:
+            #print(self.name,_loop",self.sock)
+            msg = _recv(self.sock)
+
+            if msg:
+                if dbg:print(self.name,"Poll._rloop",[msg])#self.sock)
+                #print([msg])
+                try:
+                    self.lock.acquire()
+                    self.data_out.append(msg)
+                finally:
+                    self.lock.release()
+            else:
+                time.sleep(0.1)
+
+# CORE CLASSES ---
 
 class Server():
     def __init__(self,cb=dummyCB,port=51000):
-        print("**** SERVER *****")
+        print("**** SERVER ***** PORT:",port)
         self._t = time.time()
         self._last_check = time.time()
         self.port=port
         self.cb = cb
         self.clients = [] 
+        self._client_nr = 0
         self.msg=b''
         self.select = select.select
+        self._poll = []
 
         self._start()
 
@@ -51,7 +175,8 @@ class Server():
                 break
             except Exception as e:
                 print("except",e)
-                print( "bind error")
+                print( "SERVER - bind error PORT:",self.port)
+                print()
                 time.sleep(1)
         
         self.server.listen(1)
@@ -73,10 +198,13 @@ class Server():
                 client.setblocking(0)
                 self.client_lock.acquire()
                 self.clients.append(client)
-                print("+++ Client %s open" % addr[0])
+                self._client_nr += 1
+                Poll(client,cb=self.cb,name="Server c:{}".format(self._client_nr))
+                print("+++ Client %s open" % addr[0],client)
             finally:
                 self.client_lock.release()
                 time.sleep(0.2)
+
     def rem_client(self,client):
         self.client_lock.acquire()
         try:
@@ -86,47 +214,16 @@ class Server():
             print("+++ Client %s close" % client)
         finally:
             self.client_lock.release()
+
     def get_clients(self):
         self.client_lock.acquire()
         clients = self.clients[:]
         self.client_lock.release()
         return clients
-    def _recv(self,sock):
-        xmsg=b""
-        msg =b""
-
-        try:
-            xmsg = sock.recv(1)#1024)#5120)
-            while xmsg:
-                if xmsg == b"\x00":
-                    break
-                msg += xmsg
-                xmsg = sock.recv(1)
-
-            idle = 0
-        except ConnectionResetError as e:
-            pass
-        except BlockingIOError as e:
-            pass
-        
-        if msg:
-            #print("msg**",msg)
-            #print("B64",sys.getsizeof(msg),len(msg))
-            msg = base64.b64decode(msg)
-            ##print("msg**",msg)
-            ##msg = msg.decode("utf8")
-            #print("str",sys.getsizeof(msg),len(msg))
 
-            try:
-                msg=zlib.decompress(msg)
-                #print("uzip",sys.getsizeof(msg),len(msg))
-                #print("msg",str(msg)[:150],"...")
-            except Exception as e:
-                print("SERVER decompress err",e)
-                #msg = b"decompression error"
+    def _recv(self,sock):
+        return _recv(sock)
 
-        
-        return msg
     def check_client(self):
         if self._last_check+1 < time.time():
             self._last_check = time.time()
@@ -138,50 +235,37 @@ class Server():
                 except ConnectionResetError as e:
                     self.rem_client(sock)
 
-    def poll(self):
-        run = 1
-        #try:
-        if 1: #while run:
-            self.check_client()
-            idle = 1
-            for sock in self.get_clients():
-                #print(dir(sock))
-                msg = self._recv(sock)
+    def poll(self,cb=None):
+        self.check_client()
+        idle = 1
+        time.sleep(0.1)
+        return
 
-                if not msg:
-                    continue
+        for sock in self.get_clients():
+            msg = self._recv(sock)
 
-                idle = 0
+            if not msg:
+                continue
 
-                msg = msg.replace(b"\x00 ",b"")
-                msg = {"cmd":msg}
+            idle = 0
+
+            msg = {"cmd":msg}
+            if cb:
+                cb(msg,sock)
+            else:
                 self.cb(msg)
 
         if idle:
             time.sleep(0.02)
         
-        #finally:pass
-        #except KeyboardInterrupt:
-        #    print(" strg+c")
-        #finally:
-        #    for c in clients:
-        #        print(c,"close")
-        #        c.close()
-        #    server.close()
-        #    print("server close")
-CMD = Server
-
-def cmd(cb=dummyCB,port=51000):
-    print("----cmd")
-    x=CMD(cb=cb,port=port)
-    while 1:
-        x.poll()
 
 
-class _Client():
-    def __init__(self,port=51000):
-        print("-----CLIENT-----")
+class Client():
+    def __init__(self,port=51000,cb=None):
+        print("-----CLIENT----- PORT:",port)
         self.port = port
+        self.cb = cb
+        self._poll = None
         self.connect()
         
     def connect(self,client_name="unkown"):
@@ -189,128 +273,56 @@ class _Client():
         self.xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         try:
             self.xs.connect((self.xip, self.port)) #50000))
+            self._poll = Poll(self.xs,cb=self.cb,name="Client :x")
         except ConnectionRefusedError as e:
-            print("e654 ConnectionRefusedError: ", "ERR: {0} ".format(e.args) ,end="")
+            print("exception 654 ConnectionRefusedError: ", "ERR: {0} ".format(e.args) ,end="")
             print("Server nicht ereichbar/unterbrochen")
             print(self.xip,self.port)
             print(self.xs)
+            print("-------")
             time.sleep(1)
             self.connect()
         print("connected !")
 
-    def send(self,nachricht):
-        try:
-            #print(sys.getsizeof(msg),len(msg))
-            if "-dbg" in sys.argv:print("send",nachricht)
-            if sys.getsizeof(nachricht):
-                nachricht=zlib.compress(nachricht)
-            #nachricht = bytes(nachricht,"utf-8")
-            nachricht = base64.b64encode(nachricht)
-            self.xs.send(nachricht + b"\00")
-        except socket.error as e:
+    def _recv(self):
+        sock = self.xs
+        return _recv(sock)
+
+    def read(self):
+        return self._recv()
+
+    def send(self,msg):
+        r=_send(self.xs,msg)
+        if not r:
             self.connect()
-        time.sleep(0.0001)
+
     def close(self):
         self.xs.close()
+
     def __del__(self):
         self.close()
 
 
-import _thread as thread
 
-class XClient():
-    def __init__(self,port=51000):
-        self.port = port 
-        self.buffer=[]
-        self._buffer=[]
-        self.lock = thread.allocate_lock()
-        thread.start_new_thread(self.loop,())
 
-    def loop(self):
-        
-        self.Client = _Client(self.port)
-        while 1:
-            if not self.lock.locked():
-                self.lock.acquire_lock()
-                try:
-                    for b in self.buffer:
-                        self._buffer.append(b)
-                    self.buffer = []
-                finally:
-                    self.lock.release_lock()
-            if self._buffer:
-                t = time.time()
-                for b in self._buffer:
-                    #print("send",len(str(b)))
-                    if b[1]+2 > t:
-                        self.Client.send(b[0])
-                self._buffer = []
-            else:
-                time.sleep(0.1)
-    def connect(self,client_name="unknown"):
-        pass
-
-                
-    def send(self,nachricht):
-        #print(self,nachricht)
-        
-        self.lock.acquire_lock()
-        try:
-            self.buffer.append([nachricht,time.time()])
-        finally:
-            self.lock.release_lock()
-#Client = _Client
-Client = XClient
-
-tcp_sender = Client
+# --- single app ---
 
+PORT=51111
+for a in sys.argv:
+    if "port=" in a:
+        PORT = a.split("=")[-1]
+        PORT = int(PORT)
 
+def bounce(data,B=None):
+    print("XxX",data)#,B)a
+    #data = data.decode()
+    for line in data:
+        line = json.loads(line)
+        for i in line:
+            print("line:",i)
 
-
-
-
-# test ----
-def run_client_test(c):
-    import random 
-    import string
-    client = c
-    print(" === TEST DATA START ===")
-
-    try:
-        for i in range(100):
-            x=random.choice(string.printable)
-            msg=bytes("hi"+str(x*random.randint(10,9999)),"utf-8")
-            print(x,sys.getsizeof(msg),len(msg))
-            client.send(msg)
-            time.sleep(0.01)
-    except Exception as e:
-        print("e",e)
-    finally:
-        client.close()
-
-    try:
-        client = Client()
-        for i in range(100):
-            x=random.choice(string.printable)
-            msg=bytes(x,"ho "+str(x*random.randint(10,9999)),"utf-8")
-            print(sys.getsizeof(msg),len(msg))
-            msg=zlib.compress(msg)
-            print(sys.getsizeof(msg),len(msg))
-            client.send(msg)
-            time.sleep(0.01)
-    except Exception as e:
-        print("e",e)
-    finally:
-        pass
-        #client.close()
-    print(" === TEST DATA END ===")
-
-
-
-TEST_PORT=51111
-# --- single app
-def test_client():
-    c = Client(port=TEST_PORT)
+def single_client():
+    c = Client(port=PORT,cb=bounce)
     if "test" in sys.argv: # test server/client
         run_client_test(c)
 
@@ -318,38 +330,31 @@ def test_client():
     while 1:
         try:
             i=""
-            i = input("cmd:")
+            print()
+            i = input("cmd:: ")
             c.send(bytes(i,"utf8"))
+            time.sleep(0.5)
+            #x=c.read()
+            #print(x)
         except Exception as e:
             print("e445",e)
 
-def test_server():
-    server = Server(port=TEST_PORT)
+
+# single server
+def single_server():
+    server = Server(port=PORT)
     
     while 1:
         server.poll()
-        time.sleep(0.00001)
-
-def test_cmd():
-    i = sys.argv.index("cmd")
-    data = sys.argv[i+1]
-    print( i ,data)
-    data = data.encode("utf-8")
-    c = Client(TEST_PORT)
-    client = c
-    time.sleep(0.05)
-    client.send(data)
-    time.sleep(0.05)
+        time.sleep(0.001)
 
 
 # =======================
 def main():
-    if "cmd" in sys.argv:
-        test_cmd()
-    elif "client" in sys.argv:
-        test_client()
+    if "client" in sys.argv:
+        single_client()
     else: 
-        test_server()
+        single_server()
 
 
 if __name__ == "__main__":