# -*- coding: utf-8 -*- """ Valid-License-Identifier: GPL-2.0-only SPDX-URL: https://spdx.org/licenses/GPL-2.0-only.html (c) 2012 micha@librelight.de """ import time import json import socket, struct import sys import os import _thread as thread import copy import random import traceback from cprint import cprint if __name__ == "__main__": sys.stdout.write("\x1b]2;Nodescan\x07") def UDP_Socket(bind=False,ip='',port=6454): sock = False try: print(sys._getframe().f_code.co_name,"BIND",(ip,port,bind),"?") sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) if bind: sock.bind((ip, port)) print(sys._getframe().f_code.co_name,"BIND",(ip,port),"OK") except socket.error as e: cprint(" Socket 6454 ", "ERR: {0} ".format(e.args),color="red") sock = False return sock def ArtPoll(sock=None,ip="2.255.255.255",port=6454): print("ArtPoll",ip,port) if not sock: sock=UDP_Socket() #port=6454 #ip="2.255.255.255" PKG=b'Art-Net\x00\x00 \x00\x0e\x06\x00' print(" -> SEND:",[PKG]) sock.sendto(PKG,(ip,port)) # ArtPol / ping sock.close() time.sleep(1) def convert_mac(MAC): #MAC = data[201:201+6] _MAC = [] for x in MAC: #x = hex(ord(x))[2:] x = hex(x)[2:] x = x.rjust(2,"0") _MAC.append(x) _MAC = ":".join(_MAC) return _MAC def convert_bin(d): return bin(d)[2:].rjust(8,"0") def convert_ip(d): IP="[0,0,0,0]" _ip = [] try: _ip.append( d[0] ) _ip.append( d[1] ) _ip.append( d[2] ) _ip.append( d[3] ) IP = str(_ip) except:pass return IP def convert_to_hex(x,d): out = b"" try: a = struct.unpack(x, d)[0] #out = hex(a) out = "{0:#0{1}x}".format(a,6) except: pass return out mc = None def connect_memcache(): global mc try: import memcache mc = memcache.Client(['127.0.0.1:11211'], debug=0) clean_mc_artpoll() return 1 except Exception as e: cprint("Err connect_memcache",e,color="red") return 0 #thread.start_new_thread(connect_memcache, () ) connect_memcache() from datetime import datetime def clean_mc_artpoll(): if mc: INDEX="index-artpoll" mc.set(INDEX ,{}) clean_mc_artpoll() def update_mc_artpoll_index(key,val=""): try: INDEX="index-artpoll" _index = mc.get(INDEX) #print("A",_index) if type(_index) is type(None): _index = {} #print("A",_index) #now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') if key not in _index: _index[key] = [0,""] _index[key][0] += 1 _index[key][1] = now #val mc.set(INDEX ,_index) except Exception as e: cprint(sys._getframe().f_code.co_name,opcode,color="red") cprint(" memcach exception",e,color="red") def ArtPollRawStore(data,addr): opcode=artnet_get_opcode(data) cprint(" ",sys._getframe().f_code.co_name,opcode,color="green") if "ArtPoll" in opcode or "ArtPollReplay" in opcode: #print("PKG3",addr, opcode,len(data)) try: k = "{}:{}".format(addr[0],opcode[0]) x=mc.set(k, data) if not x: cprint(" ArtPollRawStore memcache.not Connected mc.set",k,x,color="red") connect_memcache() update_mc_artpoll_index(k,val="") except Exception as e: cprint(" ArtPollRawStore err:",e,color="red") def artnet_get_opcode(head): #print([head]) #[9:10]) opcode=0x0000 name ="unkown" try: opcode=hex(struct.unpack('<h', head[8:10])[0]) except: print("opcode error",[head]) if opcode == '0x5000': name = "ArtDMX" elif opcode == '0x2000': name = "ArtPoll" elif opcode == '0x2100': name = "ArtPollReplay" return (name,opcode) def ArtNet_decode_pollreplay(data): debug = 0 node = {} if len(data) < 10: #min opcode return node opcode=convert_to_hex("<h",data[8:10]) if opcode != '0x2100': #OpPollReplay return node if len(data) < 207: #Mal return node #print(data[174:174+4]) #print("===================================================================-") #print("decode",data[:13]) # UDP PACKAGE VALUE:INDEX:RAGE CONF = {} CONF["IP"] = [10,14+1] CONF["port"] = [14,15+1] CONF["version"] = [16,17+1] CONF["NetSwitch"] = [18] CONF["SubSwitch"] = [19] CONF["oem"] = [20,21+1] CONF["ubea"] = [22] CONF["status"] = [23] CONF["esta"] = [24,25+1] CONF["sname"] = [26,26+17] CONF["lname"] = [44,44+43] CONF["NodeReport"] = [108,108+20] CONF["NumPort"] = [173] CONF["PortTypes"] = [174,174+4] CONF["GoodInput"] = [178,178+4] CONF["GoodOutput"] = [182,182+4] CONF["SwIn"] = [186,186+4] CONF["SwOut"] = [190,190+4] #CONF["MSG"] = [108,108+40] CONF["MAC"] = [201,201+6] cleanup = ["sname","lname","MSG","NodeReport"] for k,v in CONF.items(): val = b'undefined' if len(v) == 2: val = data[v[0]:v[1]] if k in cleanup: val = val.strip(b"\x00") val = val.decode(errors="ignore") if len(v) == 1: val = data[v[0]] node[k] = val # ================================ node["MAC"] = convert_mac(node["MAC"]) node["IP"] = convert_ip(node["IP"]) node["status"] = convert_bin(node["status"]) node["opcode"] = opcode unpack = {"port":b"<H"} for k,v in node.items(): if k in unpack: up = unpack[k] #print(k,v,up) node[k] = struct.unpack(up,v)[0] to_hex = {"version":'>H',"oem":'>H',"esta":"<H"} for k,u in to_hex.items(): if k in node: v=node[k] node[k] = convert_to_hex(u,v) #for k,v in node.items(): # if type(node[k]) is bytes: # node[k] = v.decode(errors="ignore") return node def test(): UDP_ArtPollReplay = b'Art-Net\x00\x00!\x02\x00\x00T6\x19\x03P\x00\x00\x11\x10\x00\x00RUAADN-01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00AVR-ArtNet DMX NODE\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00Node is ready\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x80\x00\x00\x00\x08\x00\x00\x00\x82\x00\x00\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00p\xb3\xd5\xfa\xff\xfa\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' NODE_OLD = {'IP': '[2, 0, 0, 84]', 'port': (6454,), 'version': '\x03P', 'NetSwitch': 0, 'SubSwitch': 0, 'oem': '\x11\x10', 'ubea': 0, 'status': 0, 'esta': 'RU', 'sname': 'AADN-01', 'lname': 'AVR-ArtNet DMX NODE', 'NumPort': 1, 'PortTypes': '\x00\x00\x00', 'GoodInput': '\x08\x00\x00\x00', 'GoodOutput': '\x00\x00\x00', 'SwIn': '\x07\x00\x00\x00', 'SwOut': '\x00\x00\x00\x00', 'MSG': 'Node is ready', 'MAC': '70:b3:d5:fa:ff:fa'} NODE = {'IP': '[2, 0, 0, 84]', 'port': 6454, 'version': '0x0350', 'NetSwitch': 0, 'SubSwitch': 0, 'oem': '0x1110', 'ubea': 0, 'status': '00000000', 'esta': '0x5552', 'sname': 'AADN-01', 'lname': 'AVR-ArtNet DMX NODE', 'NodeReport': 'Node is ready', 'NumPort': 1, 'PortTypes': b'\x80\x00\x00\x00', 'GoodInput': b'\x08\x00\x00\x00', 'GoodOutput': b'\x82\x00\x00\x00', 'SwIn': b'\x07\x00\x00\x00', 'SwOut': b'\x00\x00\x00\x00', 'MAC': '70:b3:d5:fa:ff:fa', 'opcode': '0x2100'} node = ArtNet_decode_pollreplay(UDP_ArtPollReplay) k_miss =[] if NODE != node: for k,v in node.items(): if k in NODE: print(v==NODE[k],"- diff -",k,v, NODE[k]) else: k_miss.append(k) print("MISSING KEY:",k_miss) print(node) print(NODE) assert NODE == node def os_get_MAC(_filter=["vmbr0","br0"]): print(sys._getframe().f_code.co_name) cmd = "ip -j -o l l" r=os.popen(cmd) out="fa:00:00:00:00:00" out={} if not r: return out try: txt = r.read() data = json.loads(txt) for i in data: #print(i) if "ifname" not in i: continue #if i["ifname"] in _filter: if 1: dev = "none" if "ifname" in i: dev = i["ifname"] #if "." in dev: # continue if dev not in out: out[dev] = [] if "address" in i: out[dev].append( i["address"]) except Exception as e: print(e,e.args[0]) return out def os_list_routing(): print(sys._getframe().f_code.co_name) cmd="ip -j -o r l" r=os.popen(cmd) out={} if not r: return ips txt=r.read() jdata = json.loads(txt) for data in jdata: #print(data) dev = "none" if "dev" in data: dev = data["dev"] if dev not in out: out[dev] = [] if "dst" in data: out[dev].append(data["dst"]) return out def os_list_ip(): print(sys._getframe().f_code.co_name) ips = {} cmd="ip -o -j -4 a l " r=os.popen(cmd) if not r: return ips txt=r.read() jdata = json.loads(txt) for data in jdata: #print(data) if "addr_info" in data: infos = data["addr_info"] for info in infos: dev = "None" if "dev" in info: dev = info["dev"] if "local" in info: ip = info["local"] if "prefixlen" in info: ip += "/"+str(info["prefixlen"]) #if "." in dev: # continue if dev not in ips: ips[dev] = [] ips[dev].append(ip) #print(" ",[dev,ip]) return ips #ips = os_list_ip() #example #get_mask(ips) def ArtAddress(ip="192.168.0.99" ,ShortName="ShortName", LongName="LongName",Port="",Universes=0,raw=0): sock = UDP_Socket() node_nr = 1 #send port port = 7600 port = 6454 print( ip) data = [] # [struct.pack('<B', 0)]*150 header = [] # Name, 7byte + 0x00 header.append(b"Art-Net\x00") # OpCode ArtDMX -> 0x6000, Low Byte first header.append(struct.pack('<H', 0x6000)) # Protocol Version 14, High Byte first header.append(struct.pack('>H', 14)) data = header[:] # NetSwitch data.append(struct.pack('<B',128)) # no change 0x7f data.append(struct.pack('<B', 0)) # filler #Short Name sname = ShortName[:17] sname = sname.ljust(18,"\x00") data.append( sname ) lname = LongName[:63] lname = lname.ljust(64,"\x00") #lname = lname[:-2]+"X\x00" data.append( lname ) print( "len sname:lname",len(sname),len(lname)) #SwIn 4; Port-Adress # univers 0-f == \x80 - \x8f i = 4 i=int(Universes)+1 #random.randint(0,99) data.append(struct.pack('<B', 127+i)) data.append(struct.pack('<B', 127+i)) data.append(struct.pack('<B', 127+i)) data.append(struct.pack('<B', 127+i)) #SwOut 4; Port-Adress data.append(struct.pack('<B', 127+i)) data.append(struct.pack('<B', 127+i)) data.append(struct.pack('<B', 127+i)) data.append(struct.pack('<B', 127+i)) #SubSwitch comination with Swin[] SwOut[] data.append(struct.pack('<B', 0)) # SubSwitch Write 128 data.append(struct.pack('<B', 255)) data.append(struct.pack('<B', 0)) data.append(struct.pack('<B', 0)) #data.append("\xf4") #print( ["ArtAdress SEND:",data,(ip,port)] ) data2 = b"" for d in data: #print(d,type(d)) if type(d) is str: data2+=bytes(d,"utf-8") elif type(d) is bytes: data2+=d else: data2+=bytes(str(d),"ascii") print(data2) if raw: return data2,(ip,port) sock.sendto(data2 ,(ip,port)) def set_ip4(cur_ip=(2,0,0,91),new_ip=(2,0,0,201),new_netmask=(255,0,0,0)): sock = UDP_Socket() #send ip port = 7600 #print(ip) data = [] #New ip #_ip = [192, 168, 2, 91] _ip = [ 2, 0, 0, 181] # CLASS C NET _ip = [ 2, 0, 0, 101] # CLASS C NET #_ip = [192, 168, 0, 91] _ip = new_ip print("NEW NODE _ip:", _ip) data.append(struct.pack('<B', _ip[0])) data.append(struct.pack('<B', _ip[1])) data.append(struct.pack('<B', _ip[2])) data.append(struct.pack('<B', _ip[3])) #_ip = [255, 255, 255, 255] # cange all nodes in Network to the same _ip ! DANGER ! #_ip = [002, 000, 000, 255] # cange all nodes in subnet to the same _ip ! DANGER ! _ip = [ 2, 0, 0, 199] # CLASS A NET _ip = [192, 168, 0, 91] #_ip = [ 2, 0, 0, 191] # CLASS C NET _ip = cur_ip print("OLD NODE _ip:", _ip) #OLD _ip , Target Node to change data.append(struct.pack('<B', _ip[0])) data.append(struct.pack('<B', _ip[1])) data.append(struct.pack('<B', _ip[2])) data.append(struct.pack('<B', _ip[3])) ip = ".".join(str(x) for x in _ip) #print("send to ip:", ip) # NETMASK MASK = [] netmask = [255, 255, 255 , 0] #fast CLASS C funktioniert #netmask = [255, 0, 0 , 0] #CLASS C funkioniert nicht netmask = new_netmask print("NEW NODE net:",netmask) MASK.append(struct.pack('<B', netmask[0])) MASK.append(struct.pack('<B', netmask[1])) MASK.append(struct.pack('<B', netmask[2])) MASK.append(struct.pack('<B', netmask[3])) data += MASK data += [struct.pack('<B', 255)]*11 print("------------------------------") data = b'CMD IP '+ b"".join(data) print("SENDING TO ",(ip,port)) print([data]) #, cur_ip=(2,0,0,91)) #sock.sendto(data ,(ip,port)) sock.sendto(data ,(ip,port)) def send_cmd(ip=(2,0,0,91),cmd=""): sock = UDP_Socket() node_nr = 1 port = 7600 print(ip) data = [] _ip = [ 2, 0, 0, 91] # CLASS C NET print("NEW NODE _ip:", _ip) data.append(struct.pack('<B', _ip[0])) data.append(struct.pack('<B', _ip[1])) data.append(struct.pack('<B', _ip[2])) data.append(struct.pack('<B', _ip[3])) #_ip = [255, 255, 255, 255] # cange all nodes in Network to the same _ip ! DANGER ! #_ip = [002, 000, 000, 255] # cange all nodes in subnet to the same _ip ! DANGER ! _ip = [ 2, 0, 0, 199] # CLASS A NET _ip = [ 2, 0, 0, 91] # CLASS A NET #_ip = [192, 168, 0, 91] _ip = [ 2, 0, 0, 255] # CLASS C NET #_ip = [ 2, 255, 255, 255] # CLASS C NET print("OLD NODE _ip:", _ip) #OLD _ip , Target Node to change data.append(struct.pack('<B', _ip[0])) data.append(struct.pack('<B', _ip[1])) data.append(struct.pack('<B', _ip[2])) data.append(struct.pack('<B', _ip[3])) ip = ".".join(str(x) for x in ip) print("send to ip:", ip) # NETMASK MASK = [] netmask = [255, 255, 255 , 0] #fast CLASS C funktioniert netmask = [255, 0, 0 , 0] #CLASS C funkioniert nicht print("NEW NODE net:",netmask) MASK.append(struct.pack('<B', netmask[0])) MASK.append(struct.pack('<B', netmask[1])) MASK.append(struct.pack('<B', netmask[2])) MASK.append(struct.pack('<B', netmask[3])) data += MASK data += [struct.pack('<B', 255)]*11 print("------------------------------") data = 'CMD '+cmd+' '+ "".join(data) print("SENDING TO ",(ip,port)) print([data] ) #sock.sendto(data ,(ip,port)) sock.sendto(data ,(ip,port)) def pack_ip(_ip): data = [b"\x00",b"\x00", b"\x00", b"\x00"] if "." in _ip: _ip = _ip.split(".") if _ip: data[0] = struct.pack('<B', int(_ip[0])) data[1] = struct.pack('<B', int(_ip[1])) data[2] = struct.pack('<B', int(_ip[2])) data[3] = struct.pack('<B', int(_ip[3])) return data def send_node_cmd(ip="",ip2="",cmd=""): sock = UDP_Socket() print() port = 7600 data = [] print("send_node_cmd",ip,ip2,cmd,port) data = pack_ip(ip[:]) print("ip",ip,ip2) if len(ip2) == 4: ip = ip2 if len(ip) == 4: ip = ".".join(map(str,ip)) print("send to ip:", ip) data2="" if not cmd: data2 = 'CMD GT' data2 = 'CMD ST' data2 = 'DMX OUT STORE' data2 = 'CMD DMX=IN ' data2 = 'CMD DMX=OUT ' data2 = 'CMD DMX=PIN ' if type(cmd) == bytes: data2 = cmd else: data2 = bytes(str(cmd),"ascii",errors="ignore") print([data2],type(data2) ) data2 = data2.ljust(20,b" ") + b"".join(data) print("SENDING COMMAND TO ",[data2],(ip,port)) sock.sendto(data2 ,(ip,port)) def get_mask(IP): print(sys._getframe().f_code.co_name) import ipaddress #mask=ipaddress.IPv4Network(IP+'/8',False) #print(mask,mask.netmask) #mask=ipaddress.IPv4Network(IP+'/24',False) mask=ipaddress.IPv4Network(IP,False) #print(IP,mask.netmask) return mask.netmask def ArtPollReply(sock=None): print(sys._getframe().f_code.co_name) if not sock: sock=UDP_Socket() port = 6454 content = [] content2=[] for c in content: if type(c) is not bytes: c= bytes(c,"ascii") content2.append(c) content = b''.join(content2) fill = 240-len(content)-1 print() if fill > 0: content = content + b"\x00"*fill def create_ip(IP,content): #CONF["IP"] = [10,14+1] x=[] j=10 for i in IP.split("."): i = 33 _ip = struct.pack("B",int(i)) # content = content[:10+j]+ _ip + content[10+j:] content = inject(j,_ip,content) j+=1 x.append([_ip,i]) print(IP,x) def patch(content,index,patch): _patch = patch[:] if type(_patch) != bytes: _patch=bytes(_patch,"ascii") content[index:index+len(_patch)] = _patch def pad(val,count,fill="\x00"): return val.ljust(count,fill)[:count] def ip_to_byte(IP): out=[] for i in IP.split("."): out.append( int(i) ) return bytes(out) print() #IP="2.0.0.255" #create_ip(IP,content) #sock.sendto(content, (IP, port)) # ======================================= content = [0]*(282-42) # new empty PKG bytes([0,0,0,..]) patch(content,0, b"Art-Net\x00") patch(content,8, b"\x00\x21") # revers 0x2100 protocol hostname=os.popen("hostname").read().strip() #hostname+="0123456789012345678901234567890123456789012345678901234567890" sname=pad(hostname,18) # Short Name, len == 30 patch(content,26,sname) lname=pad("LibreLight "+hostname,64) # Long Name, len == 64 patch(content,26+18,lname) Report="LibreLight is ready CPU:40%" #Report+="12345678901234567890123456789012345123456789012345678901234567890" Report=pad(Report,64) patch(content,26+28+54,Report) ips=os_list_ip() mac= os_get_MAC() if "vmbr0" in mac: mac = mac["vmbr0"][0] print() print([mac]) print() mac=mac.replace(":","") print([mac]) if len(mac) == 12: mac=bytes.fromhex(mac) #content[201:] = mac patch(content,201,mac) #print([mac]) print("send" ,[content]) for IP in ["192.168.2.255","2.0.0.255","10.10.10.255"]: content2 = content[:] _IP = ip_to_byte(IP) patch(content2,10,_IP) content2 = bytes(content2) sock.sendto(content2, (IP, port)) print("send" ,[content]) def ArtNet_Server(verbose=0): print("start main()") sock = UDP_Socket(bind=True,ip='',port=6454) print(" -- loop --") while sock: data, addr = sock.recvfrom(300) cprint(" <- ArtNet_Server rcv:",[addr],color="cyan") ArtPollRawStore(data,addr) opcode="" if len(data) >= 10: opcode=convert_to_hex("<h",data[8:10]) if opcode != '0x2100': #OpPollReplay continue print(" <-",addr,len(data),opcode) if verbose: nodes = ArtNet_decode_pollreplay(data) for k,v in nodes.items(): print("-",[k,v]) print("end main()") def print_help(): print("help -h --help ") print(" --main") print(" --ArtPoll") print(" --ArtPollReplay") if __name__ == "__main__": test() if "-h" in sys.argv or "--help" in sys.argv: print_help() elif "--ArtPoll" in sys.argv: ArtPoll() elif "--ArtPollReplay" in sys.argv: ArtPollReply() elif "-MAC" in sys.argv: mac= os_get_MAC() for k,v in mac.items(): print("-",k,v) print() for k,v in os_list_ip().items(): print("-",k,v) print() for k,v in os_list_routing().items(): print("-",k,v) print() print( get_mask("192.168.2.2/24")) print( get_mask("192.168.2.2/25")) print( get_mask("192.168.2.2/8")) elif "--main" in sys.argv: def loop(): while 1: ArtPoll() time.sleep(5) thread.start_new_thread(loop,()) ArtNet_Server(verbose=1) else: print_help()