zchat.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. import socket
  2. import sys
  3. import time
  4. import select
  5. import zlib
  6. import base64
  7. import json
  8. import time
  9. import _thread
  10. def dummyCB(msg):
  11. t = str(time.time())[7:]
  12. t = float(t)
  13. t1 = 0
  14. try:
  15. cmd = json.loads(msg["cmd"])
  16. t1 = cmd#[0]
  17. t1 = float(t1)
  18. except:pass
  19. print("d:",round(t1-t,3),"dummy_CB",msg)
  20. class Server():
  21. def __init__(self,cb=dummyCB,port=51000):
  22. print("**** SERVER *****")
  23. self._t = time.time()
  24. self._last_check = time.time()
  25. self.port=port
  26. self.cb = cb
  27. self.clients = []
  28. self.msg=b''
  29. self.select = select.select
  30. self._start()
  31. def _start(self):
  32. self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  33. self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  34. #self.xs.getsockopt(socket.AF_INET, socket.SO_REUSEADDR )
  35. while 1:
  36. try:
  37. self.server.bind(("", self.port))
  38. break
  39. except Exception as e:
  40. print("except",e)
  41. print( "bind error")
  42. time.sleep(1)
  43. self.server.listen(1)
  44. self.client_loop()
  45. def time(self):
  46. return self._t - time.time()
  47. def client_loop(self):
  48. self.client_lock = _thread.allocate_lock()
  49. print(dir(self.client_lock),"-----2:") # = _thread.allocate_lock()
  50. _thread.start_new_thread(self._client_loop,())
  51. def _client_loop(self):
  52. print("---- start server loop ----")
  53. while 1:
  54. try:
  55. client, addr = self.server.accept()
  56. client.setblocking(0)
  57. self.client_lock.acquire()
  58. self.clients.append(client)
  59. print("+++ Client %s open" % addr[0])
  60. finally:
  61. self.client_lock.release()
  62. time.sleep(0.2)
  63. def rem_client(self,client):
  64. self.client_lock.acquire()
  65. try:
  66. self.clients.remove(client)
  67. #print(dir(client))
  68. #print((client.family.name))
  69. print("+++ Client %s close" % client)
  70. finally:
  71. self.client_lock.release()
  72. def get_clients(self):
  73. self.client_lock.acquire()
  74. clients = self.clients[:]
  75. self.client_lock.release()
  76. return clients
  77. def _recv(self,sock):
  78. xmsg=b""
  79. msg =b""
  80. try:
  81. xmsg = sock.recv(1)#1024)#5120)
  82. while xmsg:
  83. if xmsg == b"\x00":
  84. break
  85. msg += xmsg
  86. xmsg = sock.recv(1)
  87. idle = 0
  88. except ConnectionResetError as e:
  89. pass
  90. except BlockingIOError as e:
  91. pass
  92. if msg:
  93. #print("msg**",msg)
  94. #print("B64",sys.getsizeof(msg),len(msg))
  95. msg = base64.b64decode(msg)
  96. ##print("msg**",msg)
  97. ##msg = msg.decode("utf8")
  98. #print("str",sys.getsizeof(msg),len(msg))
  99. try:
  100. msg=zlib.decompress(msg)
  101. #print("uzip",sys.getsizeof(msg),len(msg))
  102. #print("msg",str(msg)[:150],"...")
  103. except Exception as e:
  104. print("SERVER decompress err",e)
  105. #msg = b"decompression error"
  106. return msg
  107. def check_client(self):
  108. if self._last_check+1 < time.time():
  109. self._last_check = time.time()
  110. for sock in self.get_clients():
  111. try:
  112. sock.send(b".")
  113. except BrokenPipeError as e:
  114. self.rem_client(sock)
  115. except ConnectionResetError as e:
  116. self.rem_client(sock)
  117. def poll(self):
  118. run = 1
  119. #try:
  120. if 1: #while run:
  121. self.check_client()
  122. idle = 1
  123. for sock in self.get_clients():
  124. #print(dir(sock))
  125. msg = self._recv(sock)
  126. if not msg:
  127. continue
  128. idle = 0
  129. msg = msg.replace(b"\x00 ",b"")
  130. msg = {"cmd":msg}
  131. self.cb(msg)
  132. if idle:
  133. time.sleep(0.02)
  134. #finally:pass
  135. #except KeyboardInterrupt:
  136. # print(" strg+c")
  137. #finally:
  138. # for c in clients:
  139. # print(c,"close")
  140. # c.close()
  141. # server.close()
  142. # print("server close")
  143. CMD = Server
  144. def cmd(cb=dummyCB,port=51000):
  145. print("----cmd")
  146. x=CMD(cb=cb,port=port)
  147. while 1:
  148. x.poll()
  149. class _Client():
  150. def __init__(self,port=51000):
  151. print("-----CLIENT-----")
  152. self.port = port
  153. self.connect()
  154. def connect(self,client_name="unkown"):
  155. self.xip = "127.0.0.1" #raw_input("IP-Adresse: ")
  156. self.xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  157. try:
  158. self.xs.connect((self.xip, self.port)) #50000))
  159. except ConnectionRefusedError as e:
  160. print("e654 ConnectionRefusedError: ", "ERR: {0} ".format(e.args) ,end="")
  161. print("Server nicht ereichbar/unterbrochen")
  162. print(self.xip,self.port)
  163. print(self.xs)
  164. time.sleep(1)
  165. self.connect()
  166. print("connected !")
  167. def send(self,nachricht):
  168. try:
  169. #print(sys.getsizeof(msg),len(msg))
  170. if "-dbg" in sys.argv:print("send",nachricht)
  171. if sys.getsizeof(nachricht):
  172. nachricht=zlib.compress(nachricht)
  173. #nachricht = bytes(nachricht,"utf-8")
  174. nachricht = base64.b64encode(nachricht)
  175. self.xs.send(nachricht + b"\00")
  176. except socket.error as e:
  177. self.connect()
  178. time.sleep(0.0001)
  179. def close(self):
  180. self.xs.close()
  181. def __del__(self):
  182. self.close()
  183. import _thread as thread
  184. class XClient():
  185. def __init__(self,port=51000):
  186. self.port = port
  187. self.buffer=[]
  188. self._buffer=[]
  189. self.lock = thread.allocate_lock()
  190. thread.start_new_thread(self.loop,())
  191. def loop(self):
  192. self.Client = _Client(self.port)
  193. while 1:
  194. if not self.lock.locked():
  195. self.lock.acquire_lock()
  196. try:
  197. for b in self.buffer:
  198. self._buffer.append(b)
  199. self.buffer = []
  200. finally:
  201. self.lock.release_lock()
  202. if self._buffer:
  203. t = time.time()
  204. for b in self._buffer:
  205. #print("send",len(str(b)))
  206. if b[1]+2 > t:
  207. self.Client.send(b[0])
  208. self._buffer = []
  209. else:
  210. time.sleep(0.1)
  211. def connect(self,client_name="unknown"):
  212. pass
  213. def send(self,nachricht):
  214. #print(self,nachricht)
  215. self.lock.acquire_lock()
  216. try:
  217. self.buffer.append([nachricht,time.time()])
  218. finally:
  219. self.lock.release_lock()
  220. #Client = _Client
  221. Client = XClient
  222. tcp_sender = Client
  223. # test ----
  224. def run_client_test(c):
  225. import random
  226. import string
  227. client = c
  228. print(" === TEST DATA START ===")
  229. try:
  230. for i in range(100):
  231. x=random.choice(string.printable)
  232. msg=bytes("hi"+str(x*random.randint(10,9999)),"utf-8")
  233. print(x,sys.getsizeof(msg),len(msg))
  234. client.send(msg)
  235. time.sleep(0.01)
  236. except Exception as e:
  237. print("e",e)
  238. finally:
  239. client.close()
  240. try:
  241. client = Client()
  242. for i in range(100):
  243. x=random.choice(string.printable)
  244. msg=bytes(x,"ho "+str(x*random.randint(10,9999)),"utf-8")
  245. print(sys.getsizeof(msg),len(msg))
  246. msg=zlib.compress(msg)
  247. print(sys.getsizeof(msg),len(msg))
  248. client.send(msg)
  249. time.sleep(0.01)
  250. except Exception as e:
  251. print("e",e)
  252. finally:
  253. pass
  254. #client.close()
  255. print(" === TEST DATA END ===")
  256. TEST_PORT=51111
  257. # --- single app
  258. def test_client():
  259. c = Client(port=TEST_PORT)
  260. if "test" in sys.argv: # test server/client
  261. run_client_test(c)
  262. time.sleep(1)
  263. while 1:
  264. try:
  265. i=""
  266. i = input("cmd:")
  267. c.send(bytes(i,"utf8"))
  268. except Exception as e:
  269. print("e445",e)
  270. def test_server():
  271. server = Server(port=TEST_PORT)
  272. while 1:
  273. server.poll()
  274. time.sleep(0.00001)
  275. def test_cmd():
  276. i = sys.argv.index("cmd")
  277. data = sys.argv[i+1]
  278. print( i ,data)
  279. data = data.encode("utf-8")
  280. c = Client(TEST_PORT)
  281. client = c
  282. time.sleep(0.05)
  283. client.send(data)
  284. time.sleep(0.05)
  285. # =======================
  286. def main():
  287. if "cmd" in sys.argv:
  288. test_cmd()
  289. elif "client" in sys.argv:
  290. test_client()
  291. else:
  292. test_server()
  293. if __name__ == "__main__":
  294. main()