#! /usr/bin/python
# -*- coding: utf-8 -*-
#from __future__ import absolute_import, division, print_function
#from builtins import str, open, range, dict
#from builtins import *
"""
This file is part of librelight.
librelight is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
librelight is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with librelight. If not, see .
(c) 2012 micha.rathfelder@gmail.com
"""
import sys
sys.stdout.write("\x1b]2;DMX-SHEET 5\x07") # terminal title
import string
import time
import os
import json
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-r", "--recive", dest="recive",
help="set recive ip")
parser.add_option("-s", "--sendto", dest="sendto",
help="set sender ip")
#parser.add_option("-q", "--quiet",
# action="store_false", dest="verbose", default=True,
# help="don't print status messages to stdout")
(options, args) = parser.parse_args()
print("option",options)
print(options.sendto)
from collections import OrderedDict
# ============================================================
# Text Grafik Curses =========================================
# ============================================================
import curses
class CursesDummy():
def __init__(self):
self.sel_host=Pager()
self.sel_host.wrap=1
self.sel_univ=Pager()
self.sel_univ.wrap=1
self.sel_mode=Pager()
self.sel_mode.wrap=1
pass
def dir(self):
pass
def test(self):
pass
def init(self):
pass
def addstr(self,x,y,txt):
pass
def draw_lines(self,lines):
pass
def inp(self):
return ""
pass
def read(self):
pass
def clear(self):
pass
def exit(self):
pass
class Manager():
def __init__(self):
self.myscreen = curses.initscr()
print( dir(self.myscreen))
print( self.myscreen.getmaxyx() )
self._inp=""
self.cmd = []
self.mode="ltp"
self.sel_host=Pager()
self.sel_host.wrap=1
self.sel_univ=Pager()
self.sel_univ.wrap=1
self.sel_mode=Pager()
self.sel_mode.wrap=1
self.sel_mode.data = ["ltp","dmx","mtx","main"] # mtx = matrix
self.sel_mode.maxindex = len( self.sel_mode.data )-1
self.ttime = time.time()
self.univ2 = 0
self.host =""
self.ohost = HostBuffer() # as default
def dir(self):
return dir(self.myscreen)
def test(self):
self.init()
#self.loop()
self.draw_lines(["a","b","c"])
try:
time.sleep(10)
finally:
self.exit()
def init(self):
curses.savetty()
curses.noecho()
curses.cbreak()
curses.noqiflush() #?
curses.noraw() #?
self.clear()
curses.beep()
frame = 10
i = 0
def addstr(self,x,y,txt):
self.myscreen.addstr(x, y, txt ) #zeile,spalte,text
def draw_lines(self,lines):
self.clear()
try:
x,y= self.myscreen.getmaxyx()
for i,l in enumerate(lines):
#print(i,l)
if i >= x-2:
break
self.myscreen.addstr(i+1, 1, l ) #zeile,spalte,text
if i >= self.myscreen.getmaxyx()[0]-2:
self.myscreen.addstr(i+1, 1, "..." ) #zeile,spalte,text
self.myscreen.refresh()
self.myscreen.resize(x-1,y-1) # to prevent slowdown..
self.myscreen.resize(x,y)
except KeyboardInterrupt as e:
self.exit()
print("KeyboardInterrupt")
raise e
#except Exception as e:
# self.exit()
# raise e
def inp(self):
x= self._inp
self._inp=""
return x
def read(self):
self.myscreen.nodelay(1)
try:
self._inp=self.myscreen.getkey()
if not self._inp:
self._inp = self.myscreen.getch()
self.myscreen.addstr(0, 1, str(self._inp) ) #zeile,spalte,text
self.myscreen.refresh()
return self._inp
except:
pass#self._inp=""
def clear(self):
self.myscreen.clear()
self.myscreen.border(0)
curses.nocbreak();
self.myscreen.keypad(0);
#self.read()
curses.echo()
curses.resetty()
#self.myscreen.addstr(10, 2, x ) #zeile,spalte,text
def exit(self):
self.clear()
curses.endwin()
print("ENDE",self)
def keyread(self):
#continue
# input command buffer
self.read()
inp2=self.inp()
if "q" == inp2:
inp2=""
self.exit()
sys.exit()
elif "?" == inp2:
self.mode = "?"
elif "," == inp2:
self.sel_mode.next()
inp2=""
elif ";" == inp2:
self.sel_mode.prev()
inp2=""
elif "." == inp2:
self.sel_univ.next()
inp2=""
elif ":" == inp2:
self.sel_univ.prev()
inp2=""
elif "-" == inp2:
self.sel_host.next()
inp2=""
elif "_" == inp2:
self.sel_host.prev()
inp2=""
elif "#" == inp2:
if "main" in self.sel_mode.data:
x = self.sel_mode.data.index( "main")
self.sel_mode.index = x
self.sel_mode.check()
inp2=""
if inp2 == "\n":
cmd2 = "".join( self.cmd).split()
self.cmd=[]
if len(cmd2) < 2:
pass
elif "C^" in cmd2:
screen.exit()
sys.exit()
elif "univ" in cmd2 or "u" == cmd2[0]:
x=""
if cmd2[1] in sel_univ.data:
x = sel_univ.data.index( cmd2[1])
sel_univ.index = x
sel_univ.check()
elif "mode" in cmd2 or "m" == cmd2[0]:
if cmd2[1] in self.sel_mode.data:
x = self.sel_mode.data.index( cmd2[1])
self.sel_mode.index = x
self.sel_mode.check()
elif "host" in cmd2 or "h" == cmd2[0]:
try:
x=int(cmd2[1])
self.sel_host.set(x)
except:
pass
else:
self.cmd.append(inp2)
def loop(self):
self.keyread()
#print( "LOOP")
host = self.sel_host.get()
univ2 = self.sel_univ.get()
self.mode = self.sel_mode.get()
if time.time()-0.12 > self.ttime:
lines = [ ]
#print("cmd:",cmd)
lines.append(" CMD:" + "".join(self.cmd) )
if self.mode=="help" or self.mode=="?":
lines.append("HILFE[h]: " )
lines.append("MODE [m]: inp, in2 in1 " )
lines.append("UNIV [u]: 0-16 " )
lines.append(" " )
lines.append("HILFE " )
elif self.mode=="dmx" or self.mode == "DMX":
self.ttime = time.time()
dmx=self.ohost.get(host,univ=univ2)#univ=head_uni)
info=self.ohost.info()
#lines.append("frame "+str(info.keys()) )
if univ2 in info:
if host in info[univ2] :
lines.append("frame "+str(info[univ2][host]["frame"]))
x=""
for i,v in enumerate(dmx):
if v == 0:
v = "+"
x += str(v).rjust(4," ")
if (i+1) % 20 == 0:# and i:
lines.append(x)
x=""
if x:
lines.append(x)
lines.append(" ")
lines.append(str(self.ttime))
#screen.draw_lines(lines)
elif self.mode=="mtx":
self.ttime = time.time()
dmx=self.ohost.get_mtx(host,univ=univ2)#univ=head_uni)
info=self.ohost.info()
#lines.append("frame "+str(info.keys()) )
if univ2 in info:
if host in info[univ2] :
lines.append("frame "+str(info[univ2][host]["frame"]))
x=""
for i,v in enumerate(dmx):
x += str(v).rjust(4," ")
if (i+1) % 20 == 0:# and i:
lines.append(x)
x=""
if x:
lines.append(x)
lines.append(" ")
lines.append(str(self.ttime))
#screen.draw_lines(lines)
elif self.mode=="ltp" or self.mode=="LTP":
self.ttime = time.time()
dmx=self.ohost.get(univ=univ2)#head_uni)
#univ2=""
host=""
info=self.ohost.info()
lines.append("frame "+str(info.keys()) )
x=""
for i,v in enumerate(dmx):
x += str(v).rjust(4," ")
if (i+1) % 20 == 0:
lines.append(x)
x=""
if x:
lines.append(x)
lines.append(" ")
lines.append(str(self.ttime))
#screen.draw_lines(lines)
else:
self.ttime = time.time()
x=self.ohost.get(univ=univ2)
#lines = []
host=""
univ2=""
info=self.ohost.info()
jinfo = ""
for i in info:
xl = json.dumps(i) + "=======X " # live
lines.append( xl )
for j in info[i]:
lines2=[]
lines.append( " " + json.dumps([j,""]) )
for k in info[i][j]:
if k in ["fpsx","uni","flag"]:
lines2.append( " "+str(k).ljust(5," ")+": " + json.dumps( info[i][j][k]) )
else:
lines.append( " "+str(k).ljust(5," ")+": " + json.dumps( info[i][j][k]) )
lines2 = "".join(lines2)
lines.append(lines2)
lines.append( " " + json.dumps([j,""]) )
lines.append(" ")
lines.append(str(self.ttime))
#screen.draw_lines(lines)
tmp = ""
tmp += " mode:"+(str(self.mode).ljust(10," "))
tmp += " univ:"+str(self.sel_univ.index)+":"+(str(self.sel_univ.get()).ljust(10," "))
tmp += " host:"+str(self.sel_host.index)+":"+(str(self.sel_host.get()).ljust(10," "))
lines.insert(0,tmp)
tmp = ""
tmp += " univ:"+ (str(self.sel_univ.data))#.ljust(20," "))
tmp += " list:"+ (str(self.sel_host.data))#.ljust(20," "))
lines.insert(0,tmp)
self.draw_lines(lines)
class UniversBuffer():
def __init__(self,univers_nr=0):
"""buffer and merge a universe from multiple sender/hosts/ip's
"""
self.__hosts = []
self.__universes_dmx = {}
self.__universes_fps = {}
self.__universes_frames = {}
self.__universes_flag = {}
self.__universes_x_frames = {}
self.__universes_x_time = {}
self.__universes_count = {}
self.__universes_timer = {}
self.__universes_matrix = ["."]*512
self.__universes_info = {}
self.__univers_nr = univers_nr
self.__frame = 0
def _add(self,host):
if host not in self.__hosts:
self.__hosts.append(host) #re-order hosts list for LTP
#print( "ADDING HOST:",host,"UNIV:",self.__univers_nr)
self.__universes_dmx[host] = [0]*512
self.__universes_frames[host] = 0
self.__universes_x_frames[host] = 0
self.__universes_fps[host] = [""]*20
self.__universes_flag[host] = [0]*20
self.__universes_x_time[host] = time.time()
self.__universes_timer[host] = [0]*512
self.__universes_info[host] = {}
def _next_frame(self,host):
self.__frame += 1
self.__universes_frames[host] += 1
self.__universes_x_frames[host] += 1
if self.__universes_x_time[host]+10 < time.time():
sec = time.time()-self.__universes_x_time[host]
fps = self.__universes_x_frames[host] /sec
#fps = round(fps,1)
fps = int(fps)
self.__universes_fps[host].pop(0)
self.__universes_fps[host].append(fps)
self.__universes_x_time[host] = time.time()
self.__universes_x_frames[host] = 0
def update(self,host,dmxframe):
if type(dmxframe) != list:
#print( "update ERROR dmxframe is not a list", host )
return
self._add(host)
update_matrix = [0]*512
dmx=[0]*512
update_flag = 0
dmxframe_old = self.__universes_dmx[host]
self._next_frame(host)
if len(dmxframe) <= 512: #len(dmxframe_old):
for i,v in enumerate(dmxframe):
if dmxframe[i] != dmxframe_old[i]:
update_flag += 1
self.__universes_matrix[i] = self.__hosts.index(host)
dmx[i] = v
self.__universes_flag[host].pop(0)
self.__universes_flag[host].append( update_flag )
tmp = {}
tmp["flag"] =update_flag
tmp["flagx"] = self.__universes_flag[host]
tmp["fpsx"] = int(self.__universes_x_frames[host] / (time.time()-self.__universes_x_time[host]))
tmp["frame"] = self.__frame
#tmp["hosts"] = self.__hosts
tmp["uni"] = self.__univers_nr
tmp["fps"] = self.__universes_fps[host]
self.__universes_info[host] = tmp
if update_flag:
#print( "UPDATE HOST:",host, update_flag,"UNIV:",self.__univers_nr)
self.__universes_dmx[host] = dmx # dmxframe
self.__universes_timer[host] = update_matrix
def get(self,host=""):
if host and host in self.__hosts:
return self.__universes_dmx[host]
dmx = [":"]*512
for i,v in enumerate(self.__universes_matrix):
if type(v) is int:
host = self.__hosts[v]
v = self.__universes_dmx[host][i]
dmx[i] = v
return dmx
def get_mtx(self,host=""):
return self.__universes_matrix
def info(self):
return self.__universes_info
def hosts(self):
x = self.__universes_dmx.keys()
x=list(x)
x.sort()
return x
class HostBuffer():
def __init__(self):
"""buffer hosts and route data into universes
"""
self.__hosts = [] # LTP order
self.__universes = OrderedDict() # {} # 192.168.0.1 = [0]*512
#self.update(host="localhost",univ=0,dmxframe=[6]*512)
dmxframe = [0]*512
dmxframe[15] = 6
#self.update(host="333.333.333.333",univ=8,dmxframe=dmxframe)
def get_mtx(self,host="", univ=""):
return self.__universes[str(univ)].get_mtx(host)
def get(self,host="", univ=""):
if str(univ) in self.__universes:
return self.__universes[str(univ)].get(host)
else:
return [-8]*512
def hosts(self):
hosts = []
for univ in self.__universes:
x=self.__universes[univ].hosts()
for y in x:
#x=univ.hosts()
if y not in hosts:
hosts.append(y)
hosts.sort()
return hosts
def univs(self):
x=self.__universes.keys()
x=list(x)
#x.sort()
return x
def update(self,host,univ, dmxframe):
#print( "update", host )
if str(univ) not in self.__universes:
self.__universes[str(univ)] = UniversBuffer(str(univ))
self.__universes[str(univ)].update(host,dmxframe)
def info(self,univ=0):
out = {}
#print self.__universes.keys()
for univ in self.__universes.keys():
#print("INFO:",univ)
x=self.__universes[univ]
out[univ] = x.info()
return out
# ============================================================
# Network ====================================================
# ============================================================
import socket, struct
import fcntl #socket control
import errno
def toPrintable(nonprintable):
out = ""
for i in str(nonprintable):
printable = string.ascii_letters + string.digits +"/()=?{[]}\;:,.-_ "
if str(i) in printable :
out += str(i)
return out
def unpack_art_dmx(data):
dmx = []
for i in range(len(data[18:]) ):
x=data[18+i]
#print("x",x)
#print( "data",b'!B', data[18+i])
#x=struct.unpack( b'!B',data[18+i])
#print( "data",b'!B', data[18+i],x)
#x=x[0]
dmx += [x]
return dmx
class Socket():
def __init__(self):
self.__poll = 0
self.__data = []
self.__addr = "NONE"
self.open()
def open(self):
try:
print("connecting to ArtNet Port 6454")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', 6454))
fcntl.fcntl(self.sock, fcntl.F_SETFL, os.O_NONBLOCK)
except socket.error as e:
print("Socket 6454 ", "ERR: {0} ".format(e.args))
#raw_input()
#sys.exit()
def poll(self):
if not self.__poll:
try:
self.__data, self.__addr = self.sock.recvfrom(6454)
data, addr = (self.__data,self.__addr)
self.host = addr[0]
head = data[:18]
rawdmx = data[18:]
#print([head],addr)
self.univ = -1
try:
self.head = struct.unpack("!8sHBBBBHBB" , head )
except Exception as e:
pass#print( "======E09823" , e)
univ = self.head[6]/255 # /512 # * 512
self.univ = int(univ)
if not options.recive:
self.__poll = 1
return 1
elif self.host.startswith(options.recive):
self.__poll = 1
return 1
else:
self.__poll = 0
except socket.timeout as e:
err = e.args[0]
if err == 'timed out':
sleep(1)
print('recv timed out, retry later')
else:
print(e)
except socket.error as e:
pass
def recive(self):
if self.__poll:
self.__poll = 0
data, addr = (self.__data,self.__addr)
#print( self.univ,self.head)
self.dmx = unpack_art_dmx(data)
return { "host":self.host,"dmx":self.dmx,"univ":self.univ,"head":self.head,"data":data,"addr":addr}
# ============================================================
# miniartnet4.py =============================================
# ============================================================
import time
import socket
import struct
import random
class ArtNetNode():
"""simple Object to generate ArtNet Network packages
works in Python2 and Python3 2021-12-05
"""
def __init__(self, to="10.10.10.255",univ=7):
self.univ=univ
self.sendto = to
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.stamp = time.time()
self.test_stamp = time.time()
self.dmx=[33]*512
self.v=0
self.d=1
def head(self):
self._header = []
self._header.append(b"Art-Net\x00") # Name, 7byte + 0x00
self._header.append(struct.pack(' 0x5000, Low Byte first
self._header.append(struct.pack('>H', 14)) # Protocol Version 14, High Byte first
self._header.append(b"\x00") # Order -> nope -> 0x00
self._header.append(struct.pack('B',1)) # Eternity Port
# Address
#if 0 <= universe <= 15 and 0 <= net <= 127 and 0 <= subnet <= 15
net, subnet, universe = (0,0,self.univ) #address
self._header.append(struct.pack('H', len(dmx) ) )
#print([c])
dmx_count = 0
for v in dmx:
if type(v) is not int:
v=0
elif v > 255: # max dmx value 255
v = 255
elif v < 0: # min dmx value 0
v = 0
dmx_count += 1
c.append(struct.pack("B",v))
c = b"".join(c)
self.s.sendto(c, (self.sendto, 6454))
return c
def _test_frame(self):
if self.test_stamp+0.1 > time.time():
return 0
self.test_stamp = time.time()
dmx = [0]*512
dmx[420] = self.v
self.dmx = dmx
self.next()
#self.send(dmx)
#print( [x] )
if self.v >= 255:
self.d=0
elif self.v <=0:
self.d=1
if self.d:
self.v+=1
else:
self.v-=1
#time.sleep(1/30.)
def next(self):
if self.stamp + (1/60) <= time.time():
self.send()
def artnet_test():
artnet = ArtNetNode()
artnet._tes_frame()
# ============================================================
# helper =====================================================
# ============================================================
class Pager(): #scroll thru list
def __init__(self):
self.data = []
self.index = 0
self.wrap = 0
self.maxindex = 0
def append(self,val):
self.data.append(val)
self.check()
def set(self,nr):
self.index = nr
self.check()
def get(self):
self.check()
if self.data:
return self.data[self.index]
def next(self):
self.index += 1
self.check(flag=1)
def prev(self):
self.index -= 1
self.check(flag=1)
def check(self,flag=0):
if flag:
if self.maxindex and self.maxindex <= len(self.data):
max = self.maxindex
else:
max = len(self.data)
else:
max = len(self.data)
if self.wrap:
if self.index >= max:
self.index = 0
elif self.index < 0:
self.index = max-1
else:
if self.index >= max:
self.index = max-1
elif self.index < 0:
self.index = 0
# ============================================================
# main =======================================================
# ============================================================
class Main():
def __init__(self):
pass
def loop(self):
ohost = HostBuffer()
screen=Manager()
screen.exit()
screen.ohost = ohost
#artnet_out = ArtNetNode(to="10.0.25.255")
artnet_out = ArtNetNode(to=options.sendto)
#artnet_out._test_frame()
#artnet = ArtNetNode()
#artnet._test_frame()
xsocket = Socket()
send_time = time.time()
try:
while 1:
#artnet._test_frame()
#artnet_out._test_frame()
if xsocket.poll():
x = xsocket.recive()
ohost.update(x["host"],x["univ"],x["dmx"])
screen.sel_univ.data = ohost.univs()
screen.sel_host.data = ohost.hosts()
if send_time +(1/30.) < time.time() and options.sendto:
send_time = time.time()
#x=ohost.get(univ=univ2)
info=ohost.info()
jinfo = ""
for i in info:
univ = i
#print( [ univ])
if str(univ) == "54":
break
xl = json.dumps(univ) + "======= " #XX
ltp=ohost.get(univ=i)
#print( xl )
#print( len(ltp) ,ltp[:20])
ltp[511] = int(univ)
artnet_out.univ=int(univ)
artnet_out.send(ltp)
#for j in info[i]:
# print( str(univ)+" " + json.dumps([j,""]) )
# for k in info[i][j]:
# print( str(univ)+ " "+str(k).ljust(5," ")+": " + json.dumps( info[i][j][k]) )
screen.loop()
time.sleep(.001)
finally:
screen.exit()
#print(dir(curses))
if __name__ == "__main__":
main = Main()
main.loop()