123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- # -*- coding: UTF-8 -*-
- import os
- import fcntl
- import time
- import socket
- import struct
- import random
- from optparse import OptionParser
- parser = OptionParser()
- parser.add_option("-r", "--recive", dest="recive",
- help="set recive ip like --recive 10.")
- parser.add_option("-s", "--sendto", dest="sendto",
- help="set sender ip like --sendto 2.255.255.255")
- parser.add_option("-t", "--test", dest="testuniv",
- help="set test univers like --test [0-16]")
- parser.add_option("", "--inmap", dest="inmap",
- help="set test univers like --test [0-16]")
- #parser.add_option("-q", "--quiet",
- # action="store_false", dest="verbose", default=True,
- # help="don't print status messages to stdout")
- (options, args) = parser.parse_args()
- print("option",options)
- print(options.sendto)
- def unpack_art_dmx(data):
- dmx = []
- for i in range(len(data[18:]) ):
- x=data[18+i]
- #print("x",x)
- #print( "data",b'!B', data[18+i])
- #x=struct.unpack( b'!B',data[18+i])
- #print( "data",b'!B', data[18+i],x)
- #x=x[0]
- dmx += [x]
- return dmx
-
- class Socket():
- def __init__(self,bind='',port=6454):
- self.__port =port
- self.__bind =bind
- self.__poll = 0
- self.__data = []
- self.__addr = "NONE"
- #self.__hosts = {"host":{"9":[0]*512}}
- self.__hosts = {}
- self.hosts = self.__hosts
- self.open()
- self._poll_clean_time = time.time()
- self._poll_clean_count = 0
- def open(self):
- try:
- print("connecting to ArtNet bind:",self.__bind,"Port",self.__port)
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-
- self.sock.bind((self.__bind, self.__port))
- fcntl.fcntl(self.sock, fcntl.F_SETFL, os.O_NONBLOCK)
- #self.sock.setblocking(0)
-
- except socket.error as e:
- print("Socket ",self.__bind,self.__port, "ERR: {0} ".format(e.args))
- #raw_input()
- #sys.exit()
- def poll_clean(self):
- if self._poll_clean_time+(1/25.) <= time.time():
- self._poll_clean_time = time.time()
- self._poll_clean()
- x = self._poll_clean_count
- self._poll_clean_count = 0
- return x
- def _poll_clean(self):
- while 1:
- try:
- self.__data, self.__addr = self.sock.recvfrom(self.__port)
- self._poll_clean_count += 1
- #return 1
- except socket.timeout as e:
- err = e.args[0]
- if err == 'timed out':
- time.sleep(1)
- print('recv timed out, retry later')
- else:
- print(e)
- break
- except socket.error as e:
- break
- def poll(self):
- if not self.__poll:
- try:
- self.__data, self.__addr = self.sock.recvfrom(self.__port)
- data, addr = (self.__data,self.__addr)
- self.host = addr[0]
- head = data[:18]
- rawdmx = data[18:]
- #print([head],addr)
- self.univ = -1
- try:
- self.head = struct.unpack("!8sHBBBBHBB" , head )
- except Exception as e:
- pass#print( "======E09823" , e)
- univ = self.head[6]/255 # /512 # * 512
- self.univ = int(univ)
- if self.host.startswith("127."): #allways recive localhost on port
- self.__poll = 1
- return 1
- elif not options.recive:
- self.__poll = 1
- return 1
- elif self.host.startswith(options.recive):
- self.__poll = 1
- return 1
- else:
- self.__poll = 0
-
- addr = str(addr)
- univ = str(univ)
- if self.__poll:
- if addr not in self.__hosts:
- self.__hosts[addr] = {}
- if univ not in self.__hosts[addr]:
- self.__hosts[addr][univ] = {}
-
- self.__hosts[addr][univ] = {"head":head,"addr":addr,"univ":univ,"dmx":rawdmx}
- self.hosts = self.__hosts
- except socket.timeout as e:
- err = e.args[0]
- if err == 'timed out':
- time.sleep(1)
- print('recv timed out, retry later')
- else:
- print(e)
- except socket.error as e:
- pass
-
- def recive(self):
- if self.__poll:
- self.__poll = 0
- data, addr = (self.__data,self.__addr)
- #print( self.univ,self.head)
- self.dmx = unpack_art_dmx(data)
- return { "host":self.host,"dmx":self.dmx,"univ":self.univ,"head":self.head,"data":data,"addr":addr}
-
- x=0
- y=0
- running = True
- xsocket = Socket()
- import pygame
- import pygame.gfxdraw
- pygame.init()
- screen = pygame.display.set_mode((780, 610))
- pygame.display.set_caption("pygame: DMX OSZI")
- pygame.mouse.set_visible(1)
- pygame.key.set_repeat(1, 30)
- clock = pygame.time.Clock()
- pygame.init()
- class Trans():
- def __init__(self,y=150):
- self.y = 150
- self.s = 0.25
- def get_y(self,y):
- return int((y*self.s)+(self.y*self.s))*-1
- import copy
- import _thread as thread
- class E():
- def __init__(self):
- self.sdata = {}
- self.lock = thread.allocate_lock()
- def loop(self):
- sdata = {}
- print("loop")
- while 1:
- flag = 0
- while xsocket.poll():
- xx = xsocket.recive()
- k = xx["host"] +":"+ str(xx["head"][6])
- sdata[k] = xx
- flag = 1
- if flag:
- try:
- self.lock.acquire()
- self.sdata = copy.deepcopy(sdata)
- finally:
- self.lock.release()
- time.sleep(0.001)
- def get(self):
-
- try:
- self.lock.acquire()
- x = self.sdata #= copy.deepcopy(asdata)
- self.sdata = {}
- return x
- finally:
- self.lock.release()
- e = E()
- thread.start_new_thread(e.loop,())
- T = Trans()
- import sys
- font = pygame.font.SysFont("FreeSans", 12) #,color=(255,0,0))
- def get_value(sdata=[] ,univ=0,dmx=[1,121]):
- data=[]
- for k in sdata:
- xx = sdata[k]
- _univ = int(xx["head"][6] /256)
- if xx["host"].startswith('2.0.0.') and _univ == univ:
- for d in dmx:
- y = xx["dmx"][d-1]
- data.append(y)
- return data
- class OSZI():
- def __init__(self):
- self._x=0
- self.sdata={}
- self.lz = time.time()
- self.c=0
- def draw(self,screen,data):#,c,x,y,T):
- x=int(self._x)
- if time.time() > self.lz:
- self.lz = time.time()+6
- self._x=0
- x=0
- rec = pygame.Rect(x+1,T.get_y(10),30,245) # clear balken
- pygame.draw.rect(screen,(10,10,0),rec)
- T.y=-260
- c=0
- for d in data:
- y=d
- pygame.gfxdraw.pixel(screen,x,T.get_y(255),(255,0,0))
- pygame.gfxdraw.pixel(screen,x,T.get_y(0),(10,10,25))
- rec = pygame.Rect(x+4,T.get_y(0),20,-80) # clear balken
- pygame.draw.rect(screen,(c,210,110),rec)
- rec = pygame.Rect(x+2,T.get_y(0),30,-80) # clear balken
- pygame.draw.rect(screen,(c,10,110),rec)
- text = font.render( str(y), True, (0,0,0))
- pygame.draw.circle(screen,(255,155,0),(x,T.get_y(y)),2)
- c+=50
- if c >255:
- c=255
- T.y-=275
- #_x+=3.5*2
- self._x+=1.5*2
- def read_event():
- running = True
- for event in pygame.event.get():
- if event.type == pygame.QUIT:
- running = False
- if event.type == pygame.KEYDOWN:
- print(event.type)
- if event.key == pygame.K_ESCAPE:
- pygame.event.post(pygame.event.Event(pygame.QUIT))
- return running
- class GRID():
- def __init__(self):
- self.grid_timer = time.time()
- def draw(self,xsdata):
- grid_timer = self.grid_timer
- if grid_timer > time.time():
- return
- grid_timer=time.time()+.0215
- for d in xsdata:
- xx=sdata[d]
- univ = xx["head"][6] //256 #/ 255
- if xx["host"].startswith('2.0.0.'):
- if univ == 0:
- rx=308
- ry=10
- rec = pygame.Rect(rx,ry,600,600) # clear balken
- pygame.draw.rect(screen,(20,20,20),rec)
- text = font.render( str(univ).rjust(3," "), True, (255,255,255))
- screen.blit(text, ( rx-10, ry ) )
- line = []
- for i,dmx in enumerate(xx["dmx"]):
- text = font.render( str(dmx).rjust(3," "), True, (255,255,255))
- screen.blit(text, ( rx+10, ry+10 ) )
- rx+=29
- if (i+1) % 20 == 0:
- rx=308
- ry+=11
- elif univ == 1:
- rx=8
- ry=310
- rec = pygame.Rect(rx,ry,600,600) # clear balken
- pygame.draw.rect(screen,(10,30,10),rec)
- text = font.render( str(univ).rjust(3," "), True, (255,255,255))
- screen.blit(text, ( rx-10, ry ) )
- line = []
- for i,dmx in enumerate(xx["dmx"]):
- text = font.render( str(dmx).rjust(3," "), True, (255,200,255))
- screen.blit(text, ( rx+10, ry+10 ) )
- rx+=29
- if (i+1) % 20 == 0:
- rx=8
- ry+=11
-
- _ips = {}
- def print_ips(sdata):
- if int(time.time()*10) % 20 == 0:
- for k in sdata:
- if k in _ips:
- _ips[k] += 1
- else:
- _ips[k] = 0
- print(k.ljust(15," "),_ips[k])
- print()
- grid_a = GRID()
- oszi_a = OSZI()
- while running:
- clock.tick(15)
- #clock.tick(225)
- sdata = e.get()
- xsdata = copy.deepcopy(sdata)
- print_ips(sdata)
- data = get_value(sdata,univ=1,dmx=[21,142,261,263])
- running = read_event()
- oszi_a.draw(screen,data) #,c,x,y,T)
- grid_a.draw(xsdata)
- pygame.display.flip()
|