zchat.py 8.5 KB


  1. import socket
  2. import sys
  3. import time
  4. import select
  5. import zlib
  6. import base64
  7. import json
  8. import time
  9. import traceback
  10. import _thread
  11. import _thread as thread
  12. dbg=0
  13. def dummyCB(msg):
  14. t = str(time.time())[7:]
  15. t = float(t)
  16. t1 = 0
  17. try:
  18. cmd = json.loads(msg["cmd"])
  19. t1 = cmd#[0]
  20. t1 = float(t1)
  21. except:pass
  22. print("d:",round(t1-t,3),"dummy_CB",msg)
  23. def _compress(msg):
  24. _msg = msg[:]
  25. if sys.getsizeof(_msg):
  26. _msg=zlib.compress(_msg)
  27. _msg = base64.b64encode(_msg)
  28. return _msg + b"\00"
  29. def _decompress(msg):
  30. _msg = b""
  31. if msg:
  32. #print("msg**",msg)
  33. _msg = msg[:]
  34. if _msg[-1] == b"\00":
  35. _msg = _msg[:-1]
  36. _msg = base64.b64decode(msg)
  37. try:
  38. _msg=zlib.decompress(_msg)
  39. except Exception as e:
  40. print("SERVER decompress err",e)
  41. return _msg
  42. def _send(sock,msg):
  43. try:
  44. if dbg:print("_send",[msg])
  45. if dbg:print()
  46. _msg = _compress(msg)
  47. r=sock.send(_msg)
  48. return r
  49. except Exception as e:
  50. print("excep _send",e)
  51. def _recv(sock):
  52. xmsg=b""
  53. msg =b""
  54. try:
  55. xmsg = sock.recv(1)#1024)#5120)
  56. if xmsg == b".":
  57. xmsg = b""
  58. #print(":",xmsg)
  59. while xmsg:
  60. #print(":::",xmsg)
  61. if xmsg == b"\x00":
  62. break
  63. msg += xmsg
  64. xmsg = sock.recv(1)
  65. if xmsg == b".":
  66. xmsg = b""
  67. except ConnectionResetError as e:
  68. pass
  69. except BlockingIOError as e:
  70. pass
  71. if msg:
  72. #if dbg:print("_recvA",[msg])
  73. msg = _decompress(msg)
  74. if dbg:print("_recvB",[msg])
  75. return msg
  76. class Poll():
  77. def __init__(self,sock,cb=None,name="<name>"):
  78. print(name,"Poll.__init__()")
  79. self.cb = cb
  80. self.name = name
  81. self.data_in = []
  82. self.data_out = []
  83. self.sock = sock
  84. self.lock = _thread.allocate_lock()
  85. _thread.start_new_thread(self._rloop,())
  86. if cb:
  87. _thread.start_new_thread(self._wloop,())
  88. def _get_out(self):
  89. out = []
  90. try:
  91. self.lock.acquire()
  92. out = self.data_out[:]
  93. self.data_out = []
  94. finally:
  95. self.lock.release()
  96. return out
  97. def _wloop(self):
  98. while 1:
  99. out = self._get_out()
  100. #print(self,"_wloop",out)
  101. if out and self.cb:
  102. if dbg:print(self.name,"Poll._wloop",out)
  103. try:
  104. self.cb(out,self.sock)
  105. except Exception as e:
  106. print("Exception self.cb",e)
  107. print(traceback.format_exc())
  108. else:
  109. time.sleep(0.002)
  110. def _rloop(self):
  111. msg = b""
  112. while 1:
  113. #print(self.name,_loop",self.sock)
  114. msg = _recv(self.sock)
  115. if msg:
  116. if dbg:print(self.name,"Poll._rloop",[msg])#self.sock)
  117. #print([msg])
  118. try:
  119. self.lock.acquire()
  120. self.data_out.append(msg)
  121. finally:
  122. self.lock.release()
  123. else:
  124. time.sleep(0.001)
  125. # CORE CLASSES ---
  126. class Server():
  127. def __init__(self,cb=dummyCB,port=51000):
  128. print("**** SERVER ***** PORT:",port)
  129. self._t = time.time()
  130. self._last_check = time.time()
  131. self.port=port
  132. self.cb = cb
  133. self.clients = []
  134. self._client_nr = 0
  135. self.msg=b''
  136. self.select = select.select
  137. self._poll = []
  138. self._start()
  139. def _start(self):
  140. self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  141. self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  142. #self.xs.getsockopt(socket.AF_INET, socket.SO_REUSEADDR )
  143. while 1:
  144. try:
  145. self.server.bind(("", self.port))
  146. break
  147. except Exception as e:
  148. print("except",e)
  149. print( "SERVER - bind error PORT:",self.port)
  150. print()
  151. time.sleep(1)
  152. self.server.listen(1)
  153. self.client_loop()
  154. def time(self):
  155. return self._t - time.time()
  156. def client_loop(self):
  157. self.client_lock = _thread.allocate_lock()
  158. #print(dir(self.client_lock),"-----2:") # = _thread.allocate_lock()
  159. _thread.start_new_thread(self._client_loop,())
  160. def _client_loop(self):
  161. print("---- start server loop ----")
  162. while 1:
  163. try:
  164. client, addr = self.server.accept()
  165. client.setblocking(0)
  166. self.client_lock.acquire()
  167. self.clients.append(client)
  168. self._client_nr += 1
  169. Poll(client,cb=self.cb,name="Server c:{}".format(self._client_nr))
  170. print("+++ Client %s open" % addr[0],client)
  171. finally:
  172. self.client_lock.release()
  173. time.sleep(0.02)
  174. def rem_client(self,client):
  175. self.client_lock.acquire()
  176. try:
  177. self.clients.remove(client)
  178. #print(dir(client))
  179. #print((client.family.name))
  180. print("+++ Client %s close" % client)
  181. finally:
  182. self.client_lock.release()
  183. def get_clients(self):
  184. self.client_lock.acquire()
  185. clients = self.clients[:]
  186. self.client_lock.release()
  187. return clients
  188. def _recv(self,sock):
  189. return _recv(sock)
  190. def check_client(self):
  191. if self._last_check+1 < time.time():
  192. self._last_check = time.time()
  193. for sock in self.get_clients():
  194. try:
  195. sock.send(b".")
  196. except BrokenPipeError as e:
  197. self.rem_client(sock)
  198. except ConnectionResetError as e:
  199. self.rem_client(sock)
  200. def poll(self,cb=None):
  201. self.check_client()
  202. idle = 1
  203. time.sleep(0.001)
  204. return
  205. for sock in self.get_clients():
  206. msg = self._recv(sock)
  207. if not msg:
  208. continue
  209. idle = 0
  210. msg = {"cmd":msg}
  211. if cb:
  212. cb(msg,sock)
  213. else:
  214. self.cb(msg)
  215. if idle:
  216. time.sleep(0.002)
  217. class Client():
  218. def __init__(self,port=51000,cb=None):
  219. print("-----CLIENT----- PORT:",port)
  220. self.port = port
  221. self.cb = cb
  222. self._poll = None
  223. self.connect()
  224. def connect(self,client_name="unkown"):
  225. self.xip = "127.0.0.1" #raw_input("IP-Adresse: ")
  226. self.xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  227. try:
  228. self.xs.connect((self.xip, self.port)) #50000))
  229. self._poll = Poll(self.xs,cb=self.cb,name="Client :x")
  230. except ConnectionRefusedError as e:
  231. print("exception 654 ConnectionRefusedError: ", "ERR: {0} ".format(e.args) ,end="")
  232. print("Server nicht ereichbar/unterbrochen")
  233. print(self.xip,self.port)
  234. print(self.xs)
  235. print("-------")
  236. time.sleep(1)
  237. self.connect()
  238. print("zchat connected !")
  239. def _recv(self):
  240. sock = self.xs
  241. return _recv(sock)
  242. def read(self):
  243. return self._recv()
  244. def send(self,msg):
  245. r=_send(self.xs,msg)
  246. if not r:
  247. self.connect()
  248. def close(self):
  249. self.xs.close()
  250. def __del__(self):
  251. self.close()
  252. # --- single app ---
  253. PORT=51111
  254. PORT=51000 #1111
  255. for a in sys.argv:
  256. if "port=" in a:
  257. PORT = a.split("=")[-1]
  258. PORT = int(PORT)
  259. def bounce(data,B=None):
  260. print("XxX",data)#,B)a
  261. #data = data.decode()
  262. for line in data:
  263. line = json.loads(line)
  264. for i in line:
  265. print("line:",i)
  266. def single_client():
  267. c = Client(port=PORT,cb=bounce)
  268. if "test" in sys.argv: # test server/client
  269. run_client_test(c)
  270. time.sleep(1)
  271. while 1:
  272. try:
  273. i=""
  274. # print()
  275. i = input("cmd:: ")
  276. c.send(bytes(i,"utf8"))
  277. time.sleep(0.5)
  278. #x=c.read()
  279. #print(x)
  280. except Exception as e:
  281. print("e445",e)
  282. # single server
  283. def single_server():
  284. server = Server(port=PORT)
  285. while 1:
  286. server.poll()
  287. time.sleep(0.001)
  288. # =======================
  289. def main():
  290. if "client" in sys.argv:
  291. single_client()
  292. else:
  293. single_server()
  294. if __name__ == "__main__":
  295. main()