mirror of
https://github.com/OpenSolo/OpenSolo.git
synced 2025-04-29 22:24:32 +02:00
303 lines
10 KiB
Python
Executable File
303 lines
10 KiB
Python
Executable File
#!/usr/bin/env python
|
|
"""
|
|
Test script to test sending RC input to shotManager
|
|
(Used for testing shotManager in SITL)
|
|
Also emulates the button server in stm32 running on Artoolink
|
|
This takes user input to change rc values (use the arrow keys)
|
|
|
|
Only meant for debugging purposes, so it's not production code
|
|
"""
|
|
|
|
#Imports
|
|
import sys
|
|
import os
|
|
import curses
|
|
import Queue
|
|
import select
|
|
import socket
|
|
import struct
|
|
import threading
|
|
import time
|
|
sys.path.append(os.path.realpath('..'))
|
|
import app_packet
|
|
sys.path.append(os.path.realpath('../../../net/usr/bin'))
|
|
from sololink import rc_pkt
|
|
sys.path.append(os.path.realpath('../../../flightcode/stm32'))
|
|
from sololink import btn_msg
|
|
|
|
#Globals
|
|
state = "starting" #state of artoo emulator
|
|
stringsUpdated = False
|
|
Amapping = "unset"
|
|
Bmapping = "unset"
|
|
shotName = "unset"
|
|
|
|
NUM_CHANNELS = 8
|
|
channelValues = [1500, 1500, 1500, 1500, 1500, 1500, 1500, 1500]
|
|
PORT = 5016
|
|
|
|
outputLock = threading.Lock()
|
|
|
|
def main(stdscr):
|
|
'''Main function that contains while loop, wrapped by curses'''
|
|
|
|
#import globals
|
|
global state
|
|
global stringsUpdated
|
|
global Amapping
|
|
global Bmapping
|
|
global shotName
|
|
global clientQueue
|
|
|
|
stringsUpdated = True
|
|
activeChannel = 0
|
|
stdscr.nodelay(1)
|
|
stdscr.addstr(0,10,"Hit 'q' to quit")
|
|
|
|
for i in range(NUM_CHANNELS):
|
|
stdscr.addstr(i+2, 20, "Channel %d : %d" % (i+1, channelValues[i]))
|
|
|
|
|
|
stdscr.addstr(activeChannel+2, 18, ">>")
|
|
stdscr.addstr(12, 10, "Hit up/down to change active channel")
|
|
stdscr.addstr(13, 10, "Hit left/right to change stick value")
|
|
stdscr.addstr(14, 10, "Hit 'c' to release sticks")
|
|
stdscr.refresh()
|
|
|
|
#start RC thread
|
|
rc = threading.Thread(name = "sendRC", target = sendRC, args = ())
|
|
rc.daemon = True
|
|
rc.start()
|
|
|
|
#start artoo thread
|
|
artooThread = threading.Thread(name = "ArtooButtonThread", target = ArtooButtonThread, args = ())
|
|
artooThread.daemon = True
|
|
artooThread.start()
|
|
|
|
key = ''
|
|
|
|
while key != ord('q'):
|
|
key = stdscr.getch()
|
|
stdscr.refresh()
|
|
if key == curses.KEY_UP:
|
|
stdscr.addstr(activeChannel+2, 18, " Channel %d : %d"%(activeChannel + 1, channelValues[activeChannel]))
|
|
activeChannel -= 1
|
|
if activeChannel < 0:
|
|
activeChannel = 0
|
|
stdscr.addstr(activeChannel+2, 18, ">>")
|
|
if key == curses.KEY_DOWN:
|
|
stdscr.addstr(activeChannel+2, 18, " Channel %d : %d"%(activeChannel + 1, channelValues[activeChannel]))
|
|
activeChannel += 1
|
|
if activeChannel >= NUM_CHANNELS:
|
|
activeChannel = NUM_CHANNELS - 1
|
|
stdscr.addstr(activeChannel+2, 18, ">>")
|
|
if key == curses.KEY_RIGHT:
|
|
channelValues[activeChannel] += 10
|
|
if channelValues[activeChannel] > 2000:
|
|
channelValues[activeChannel] = 2000
|
|
stdscr.addstr(activeChannel+2, 18, " Channel %d : %d"%(activeChannel + 1, channelValues[activeChannel]))
|
|
if key == curses.KEY_LEFT:
|
|
channelValues[activeChannel] -= 10
|
|
if channelValues[activeChannel] < 1000:
|
|
channelValues[activeChannel] = 1000
|
|
stdscr.addstr(activeChannel+2, 18, " Channel %d : %d"%(activeChannel + 1, channelValues[activeChannel]))
|
|
if key == ord('c'):
|
|
for i in range(NUM_CHANNELS):
|
|
channelValues[i] = 1500
|
|
stdscr.addstr(i+2, 20, "Channel %d : %d" % (i+1, channelValues[i]))
|
|
# hit the A button
|
|
if key == ord('a'):
|
|
# (timestamp_us, button_id, button_event, pressed_mask)
|
|
pkt = struct.pack("<QBBH", 0, btn_msg.ButtonA, btn_msg.Press, 0)
|
|
sendPacket(pkt)
|
|
if key == ord('b'):
|
|
# (timestamp_us, button_id, button_event, pressed_mask)
|
|
pkt = struct.pack("<QBBH", 0, btn_msg.ButtonB, btn_msg.Press, 0)
|
|
sendPacket(pkt)
|
|
if key == ord('p'):
|
|
# (timestamp_us, button_id, button_event, pressed_mask)
|
|
pkt = struct.pack("<QBBH", 0, btn_msg.ButtonLoiter, btn_msg.Press, 0)
|
|
sendPacket(pkt)
|
|
if key == ord('f'):
|
|
# (timestamp_us, button_id, button_event, pressed_mask)
|
|
pkt = struct.pack("<QBBH", 0, btn_msg.ButtonFly, btn_msg.Press, 0)
|
|
sendPacket(pkt)
|
|
|
|
if stringsUpdated:
|
|
stdscr.addstr(16, 10, 'Current shot: ' + shotName + ' ')
|
|
stdscr.addstr(17, 10, "Hit 'a' to do " + Amapping + ' ')
|
|
stdscr.addstr(18, 10, "Hit 'b' to do " + Bmapping + ' ')
|
|
stdscr.addstr(19, 10, "Hit 'p' to hit pause button")
|
|
stdscr.addstr(20, 10, "Hit 'f' to hit fly button")
|
|
stdscr.addstr(21, 10, state + ' ')
|
|
stdscr.refresh()
|
|
|
|
stringsUpdated = False
|
|
time.sleep(0.01)
|
|
|
|
def sendRC():
|
|
'''Sends RC to Sololink'''
|
|
|
|
#import globals
|
|
global channelValues
|
|
rcSock = None
|
|
|
|
#Main RC loop
|
|
while True:
|
|
if not rcSock:
|
|
#print "trying to connect"
|
|
if os.path.exists( "/tmp/shotManager_RCInputs_socket" ):
|
|
rcSock = socket.socket( socket.AF_UNIX, socket.SOCK_DGRAM )
|
|
try:
|
|
rcSock.connect( "/tmp/shotManager_RCInputs_socket" )
|
|
except:
|
|
rcSock = None
|
|
else:
|
|
#print "connected, sending rc info"
|
|
try:
|
|
pkt = rc_pkt.pack((0, 0, [channelValues[2], channelValues[0],
|
|
channelValues[1], channelValues[3], channelValues[4],
|
|
channelValues[5], channelValues[6], channelValues[7]]))
|
|
rcSock.send(pkt)
|
|
#print "sending rc data"
|
|
except socket.error:
|
|
#print "failed to send"
|
|
rcSock = None
|
|
time.sleep(1.0 / 50.0)
|
|
rcSock.close()
|
|
|
|
def sendPacket(pkt):
|
|
'''Sends packet to client (Sololink)'''
|
|
if clientQueue:
|
|
global outputPacket
|
|
outputLock.acquire()
|
|
clientQueue.put(pkt)
|
|
outputLock.release()
|
|
|
|
|
|
def ArtooButtonThread():
|
|
'''this thread simulates the server running in the stm32 process in Artoolink'''
|
|
|
|
#import globals
|
|
global state
|
|
global stringsUpdated
|
|
global Amapping
|
|
global Bmapping
|
|
global shotName
|
|
global clientQueue
|
|
|
|
#establish server
|
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
server.setblocking(0)
|
|
|
|
# Bind the socket to the port
|
|
server_address = ('', PORT)
|
|
|
|
stringsUpdated = True
|
|
currentPacket = ""
|
|
currentPacketLength = 9999
|
|
currentPacketType = None
|
|
|
|
#blocking connect loop
|
|
while True:
|
|
try:
|
|
server.bind(server_address)
|
|
except:
|
|
time.sleep(1.0)
|
|
pass
|
|
else:
|
|
break
|
|
|
|
#update emulator state
|
|
state = "bound"
|
|
stringsUpdated = True
|
|
|
|
server.listen(2)
|
|
inputs = [server,]
|
|
outputs = []
|
|
|
|
#main emulator loop
|
|
while True:
|
|
#grab data on pipe
|
|
rl, wl, el = select.select( inputs, outputs, [] )
|
|
|
|
for s in rl:
|
|
if s is server:
|
|
client, address = server.accept()
|
|
client.setblocking(0)
|
|
inputs.append(client)
|
|
outputs.append(client)
|
|
clientQueue = Queue.Queue()
|
|
state = "accepted"
|
|
stringsUpdated = True
|
|
else:
|
|
data = s.recv( 1 )
|
|
|
|
if data:
|
|
currentPacket += data
|
|
|
|
state = "got a packet"
|
|
|
|
# parse the packet
|
|
if len(currentPacket) == app_packet.SOLO_MESSAGE_HEADER_LENGTH:
|
|
(currentPacketType, currentPacketLength) = struct.unpack('<II', currentPacket)
|
|
if len(currentPacket) == app_packet.SOLO_MESSAGE_HEADER_LENGTH + currentPacketLength:
|
|
value = currentPacket[app_packet.SOLO_MESSAGE_HEADER_LENGTH:]
|
|
if currentPacketType == btn_msg.ARTOO_MSG_SET_BUTTON_STRING:
|
|
state = "Button String"
|
|
(button_id, event, shot, mask) = struct.unpack('<BBbb', value[:4])
|
|
artooString = value[4:]
|
|
|
|
if button_id == btn_msg.ButtonA:
|
|
# remove null terminator
|
|
Amapping = artooString[:-1]
|
|
|
|
if mask & btn_msg.ARTOO_BITMASK_ENABLED:
|
|
Amapping += "(enabled)"
|
|
else:
|
|
Amapping += "(disabled)"
|
|
elif button_id == btn_msg.ButtonB:
|
|
# remove null terminator
|
|
Bmapping = artooString[:-1]
|
|
|
|
if mask & btn_msg.ARTOO_BITMASK_ENABLED:
|
|
Bmapping += "(enabled)"
|
|
else:
|
|
Bmapping += "(disabled)"
|
|
elif currentPacketType == btn_msg.ARTOO_MSG_SET_SHOT_STRING:
|
|
# remove null terminator
|
|
shotName = value[:-1]
|
|
|
|
currentPacket = ""
|
|
else:
|
|
inputs.remove(s)
|
|
outputs.remove(s)
|
|
s.close()
|
|
client = None
|
|
clientQueue = None
|
|
stringsUpdated = True
|
|
|
|
# now handle writes
|
|
for s in wl:
|
|
outputLock.acquire()
|
|
try:
|
|
if clientQueue:
|
|
msg = clientQueue.get_nowait()
|
|
except Queue.Empty:
|
|
pass
|
|
else:
|
|
try:
|
|
s.send(msg)
|
|
except Exception as e:
|
|
state = 'Shotmanager disconnected.'
|
|
shotName = 'unset'
|
|
Amapping = 'unset'
|
|
Bmapping = 'unset'
|
|
|
|
outputLock.release()
|
|
|
|
#Calls main function inside a curses wrapper for graceful exit
|
|
if __name__ == '__main__':
|
|
curses.wrapper(main)
|