123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- import socket
- import sys
- import time
- import select
- import zlib
- import base64
- import json
- import time
- import traceback
- import _thread
- import _thread as thread
- dbg=0
- def dummyCB(msg):
-
- t = str(time.time())[7:]
- t = float(t)
- t1 = 0
- try:
- cmd = json.loads(msg["cmd"])
- t1 = cmd#[0]
- t1 = float(t1)
- except:pass
- print("d:",round(t1-t,3),"dummy_CB",msg)
- def _compress(msg):
- _msg = msg[:]
- if sys.getsizeof(_msg):
- _msg=zlib.compress(_msg)
- _msg = base64.b64encode(_msg)
- return _msg + b"\00"
- def _decompress(msg):
- _msg = b""
- if msg:
- #print("msg**",msg)
- _msg = msg[:]
- if _msg[-1] == b"\00":
- _msg = _msg[:-1]
- _msg = base64.b64decode(msg)
- try:
- _msg=zlib.decompress(_msg)
- except Exception as e:
- print("SERVER decompress err",e)
-
- return _msg
- def _send(sock,msg):
- try:
- if dbg:print("_send",[msg])
- if dbg:print()
- _msg = _compress(msg)
- r=sock.send(_msg)
- return r
- except Exception as e:
- print("excep _send",e)
- def _recv(sock):
- xmsg=b""
- msg =b""
- try:
- xmsg = sock.recv(1)#1024)#5120)
- if xmsg == b".":
- xmsg = b""
- #print(":",xmsg)
- while xmsg:
- #print(":::",xmsg)
- if xmsg == b"\x00":
- break
- msg += xmsg
- xmsg = sock.recv(1)
- if xmsg == b".":
- xmsg = b""
- except ConnectionResetError as e:
- pass
- except BlockingIOError as e:
- pass
- if msg:
- #if dbg:print("_recvA",[msg])
- msg = _decompress(msg)
- if dbg:print("_recvB",[msg])
- return msg
- class Poll():
- def __init__(self,sock,cb=None,name="<name>"):
- print(name,"Poll.__init__()")
- self.cb = cb
- self.name = name
- self.data_in = []
- self.data_out = []
- self.sock = sock
- self.lock = _thread.allocate_lock()
- _thread.start_new_thread(self._rloop,())
- if cb:
- _thread.start_new_thread(self._wloop,())
- def _get_out(self):
- out = []
- try:
- self.lock.acquire()
- out = self.data_out[:]
- self.data_out = []
- finally:
- self.lock.release()
- return out
- def _wloop(self):
- while 1:
- out = self._get_out()
- #print(self,"_wloop",out)
- if out and self.cb:
- if dbg:print(self.name,"Poll._wloop",out)
- try:
- self.cb(out,self.sock)
- except Exception as e:
- print("Exception self.cb",e)
- print(traceback.format_exc())
- else:
- time.sleep(0.002)
- def _rloop(self):
- msg = b""
- while 1:
- #print(self.name,_loop",self.sock)
- msg = _recv(self.sock)
- if msg:
- if dbg:print(self.name,"Poll._rloop",[msg])#self.sock)
- #print([msg])
- try:
- self.lock.acquire()
- self.data_out.append(msg)
- finally:
- self.lock.release()
- else:
- time.sleep(0.001)
- # CORE CLASSES ---
- class Server():
- def __init__(self,cb=dummyCB,port=51000):
- print("**** SERVER ***** PORT:",port)
- self._t = time.time()
- self._last_check = time.time()
- self.port=port
- self.cb = cb
- self.clients = []
- self._client_nr = 0
- self.msg=b''
- self.select = select.select
- self._poll = []
- self._start()
- def _start(self):
- self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- #self.xs.getsockopt(socket.AF_INET, socket.SO_REUSEADDR )
- while 1:
- try:
- self.server.bind(("", self.port))
- break
- except Exception as e:
- print("except",e)
- print( "SERVER - bind error PORT:",self.port)
- print()
- time.sleep(1)
-
- self.server.listen(1)
- self.client_loop()
- def time(self):
- return self._t - time.time()
- def client_loop(self):
- self.client_lock = _thread.allocate_lock()
- #print(dir(self.client_lock),"-----2:") # = _thread.allocate_lock()
- _thread.start_new_thread(self._client_loop,())
- def _client_loop(self):
- print("---- start server loop ----")
- while 1:
- try:
- client, addr = self.server.accept()
- client.setblocking(0)
- self.client_lock.acquire()
- self.clients.append(client)
- self._client_nr += 1
- Poll(client,cb=self.cb,name="Server c:{}".format(self._client_nr))
- print("+++ Client %s open" % addr[0],client)
- finally:
- self.client_lock.release()
- time.sleep(0.02)
- def rem_client(self,client):
- self.client_lock.acquire()
- try:
- self.clients.remove(client)
- #print(dir(client))
- #print((client.family.name))
- print("+++ Client %s close" % client)
- finally:
- self.client_lock.release()
- def get_clients(self):
- self.client_lock.acquire()
- clients = self.clients[:]
- self.client_lock.release()
- return clients
- def _recv(self,sock):
- return _recv(sock)
- def check_client(self):
- if self._last_check+1 < time.time():
- self._last_check = time.time()
- for sock in self.get_clients():
- try:
- sock.send(b".")
- except BrokenPipeError as e:
- self.rem_client(sock)
- except ConnectionResetError as e:
- self.rem_client(sock)
- def poll(self,cb=None):
- self.check_client()
- idle = 1
- time.sleep(0.001)
- return
- for sock in self.get_clients():
- msg = self._recv(sock)
- if not msg:
- continue
- idle = 0
- msg = {"cmd":msg}
- if cb:
- cb(msg,sock)
- else:
- self.cb(msg)
- if idle:
- time.sleep(0.002)
-
- class Client():
- def __init__(self,port=51000,cb=None):
- print("-----CLIENT----- PORT:",port)
- self.port = port
- self.cb = cb
- self._poll = None
- self.connect()
-
- def connect(self,client_name="unkown"):
- self.xip = "127.0.0.1" #raw_input("IP-Adresse: ")
- self.xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- self.xs.connect((self.xip, self.port)) #50000))
- self._poll = Poll(self.xs,cb=self.cb,name="Client :x")
- except ConnectionRefusedError as e:
- print("exception 654 ConnectionRefusedError: ", "ERR: {0} ".format(e.args) ,end="")
- print("Server nicht ereichbar/unterbrochen")
- print(self.xip,self.port)
- print(self.xs)
- print("-------")
- time.sleep(1)
- self.connect()
- print("zchat connected !")
- def _recv(self):
- sock = self.xs
- return _recv(sock)
- def read(self):
- return self._recv()
- def send(self,msg):
- r=_send(self.xs,msg)
- if not r:
- self.connect()
- def close(self):
- self.xs.close()
- def __del__(self):
- self.close()
- # --- single app ---
- PORT=51111
- PORT=51000 #1111
- for a in sys.argv:
- if "port=" in a:
- PORT = a.split("=")[-1]
- PORT = int(PORT)
- def bounce(data,B=None):
- print("XxX",data)#,B)a
- #data = data.decode()
- for line in data:
- line = json.loads(line)
- for i in line:
- print("line:",i)
- def single_client():
- c = Client(port=PORT,cb=bounce)
- if "test" in sys.argv: # test server/client
- run_client_test(c)
- time.sleep(1)
- while 1:
- try:
- i=""
- # print()
- i = input("cmd:: ")
- c.send(bytes(i,"utf8"))
- time.sleep(0.5)
- #x=c.read()
- #print(x)
- except Exception as e:
- print("e445",e)
- # single server
- def single_server():
- server = Server(port=PORT)
-
- while 1:
- server.poll()
- time.sleep(0.001)
- # =======================
- def main():
- if "client" in sys.argv:
- single_client()
- else:
- single_server()
- if __name__ == "__main__":
- main()
|