#!/usr/bin/env python # coding: Latin-1 # Modified for Python3 # Creates a web-page interface for MonsterBorg # Import library functions we need import ThunderBorg3 as ThunderBorg import time import sys import threading import socketserver as SocketServer import datetime # Settings for the web-page webPort = 8000 # Port number for the web-page, 80 is what web-pages normally use displayRate = 10 # Status requests per second, needed to keep the watchdog happy # Global values global TB global running global watchdog global percentLeft global percentRight percentLeft = 0 percentRight = 0 running = True TB = ThunderBorg.ThunderBorg() #TB.i2cAddress = 0x15 # Uncomment and change the value if you have changed the board address TB.Init() if not TB.foundChip: boards = ThunderBorg.ScanForThunderBorg() if len(boards) == 0: print ('No ThunderBorg found, check you are attached :)') else: print ('No ThunderBorg at address %02X, but we did find boards:' % (TB.i2cAddress)) for board in boards: print (' %02X (%d)' % (board, board)) print ('If you need to change the I²C address change the setup line so it is correct, e.g.') print ('TB.i2cAddress = 0x%02X' % (boards[0])) sys.exit() TB.SetCommsFailsafe(False) TB.SetLedShowBattery(False) TB.SetLeds(0,0,1) # Power settings voltageIn = 1.2 * 10 # Total battery voltage to the ThunderBorg voltageOut = 12.0 * 0.95 # Maximum motor voltage, we limit it to 95% to allow the RPi to get uninterrupted power # Setup the power limits if voltageOut > voltageIn: maxPower = 0.95 else: maxPower = voltageOut / float(voltageIn) # Timeout thread class Watchdog(threading.Thread): def __init__(self): super(Watchdog, self).__init__() self.event = threading.Event() self.terminated = False self.start() self.timestamp = time.time() def run(self): timedOut = True # This method runs in a separate thread while not self.terminated: # Wait for a network event to be flagged for up to one second if timedOut: if self.event.wait(1): # Connection print ('Reconnected...') TB.SetLedShowBattery(True) timedOut = False self.event.clear() else: if self.event.wait(1): self.event.clear() else: # Timed out print ('Timed out...') TB.SetLedShowBattery(False) TB.SetLeds(0,0,1) timedOut = True TB.MotorsOff() # Class used to implement the web server class WebServer(SocketServer.BaseRequestHandler): def handle(self): global TB global watchdog global percentLeft global percentRight # Get the HTTP request data reqData = self.request.recv(1024).strip() reqData = reqData.split(b'\n') # Get the URL requested getPath = '' for line in reqData: if line.startswith(b'GET'): parts = line.split(b' ') getPath = parts[1] break watchdog.event.set() if getPath.startswith(b'/status/'): # Status update - no changes httpText = '
' httpText += 'Speeds: %.0f %%, %.0f %%' % (percentLeft, percentRight) httpText += '
' self.send(httpText) elif getPath.startswith(b'/off'): # Turn the drives off httpText = '
' httpText += 'Speeds: 0 %, 0 %' httpText += '
' self.send(httpText) percentLeft = 0 percentRight = 0 TB.MotorsOff() elif getPath.startswith(b'/set/'): # Motor power setting: /set/driveLeft/driveRight parts = getPath.split(b'/') # Get the power levels if len(parts) >= 4: try: driveLeft = float(parts[2]) driveRight = float(parts[3]) except: # Bad values driveRight = 0.0 driveLeft = 0.0 else: # Bad request driveRight = 0.0 driveLeft = 0.0 # Ensure settings are within limits if driveRight < -1: driveRight = -1 elif driveRight > 1: driveRight = 1 if driveLeft < -1: driveLeft = -1 elif driveLeft > 1: driveLeft = 1 # Report the current settings percentLeft = driveLeft * 100.0; percentRight = driveRight * 100.0; httpText = '
' httpText += 'Speeds: %.0f %%, %.0f %%' % (percentLeft, percentRight) httpText += '
' self.send(httpText) # Set the outputs driveLeft *= maxPower driveRight *= maxPower TB.SetMotor1(driveRight) TB.SetMotor2(driveLeft) elif getPath == b'/': # Main page, click buttons to move and to stop displayDelay = int(1000 / displayRate) httpText = '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '\n' % (displayDelay) httpText += '\n' httpText += '
\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '
\n' httpText += '\n' httpText += '\n' self.send(httpText) elif getPath == b'/hold': # Alternate page, hold buttons to move (does not work with all devices) displayDelay = int(1000 / displayRate) httpText = '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '\n' % (displayDelay) httpText += '\n' httpText += '
\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '
\n' httpText += '\n' httpText += '\n' self.send(httpText) else: # Unexpected page self.send(b'Path : "%s"' % (getPath)) def send(self, content): if isinstance(content, bytes): byte_content = content else: string_data = content byte_content = string_data.encode() self.request.sendall(b'HTTP/1.0 200 OK\n\n%s' % (byte_content)) print ('Setup the watchdog') watchdog = Watchdog() # Run the web server until we are told to close try: httpServer = None httpServer = SocketServer.TCPServer(("0.0.0.0", webPort), WebServer) except: # Failed to open the port, report common issues print () print ('Failed to open port %d' % (webPort)) print ('Make sure you are running the script with sudo permissions') print ('Other problems include running another script with the same port') print ('If the script was just working recently try waiting a minute first') print () # Flag the script to exit running = False try: print ('Press CTRL+C to terminate the web-server') while running: httpServer.handle_request() except KeyboardInterrupt: # CTRL+C exit print ('\nUser shutdown') finally: # Turn the motors off under all scenarios TB.MotorsOff() print ('Motors off') # Tell each thread to stop, and wait for them to end if httpServer != None: httpServer.server_close() running = False watchdog.terminated = True watchdog.join() TB.SetLedShowBattery(False) TB.SetLeds(0,0,0) TB.MotorsOff() print ('Web-server terminated.')