#!/usr/bin/env python # coding: Latin-1 # Creates a web-page interface for DiddyBorg Metal Edition # Import library functions we need import PicoBorgRev import time import sys import threading import SocketServer import picamera import picamera.array import cv2 import UltraBorg import datetime import math import XLoBorg import wiringpi2 as wiringpi #======================================================================= # Tell the library to disable diagnostic printouts XLoBorg.printFunction = XLoBorg.NoPrint # Start the XLoBorg module (sets up devices) XLoBorg.Init() # Settings for the web-page webPort = 80 # Port number for the web-page, 80 is what web-pages normally use imageWidth = 240 # Width of the captured image in pixels imageHeight = 180 # Height of the captured image in pixels frameRate = 10 # Number of images to capture per second displayRate = 2 # Number of images to request per second photoDirectory = '/home/pi' # Directory to save photos to # Movement mode constants MANUAL_MODE = 0 # User controlled movement SEMI_AUTO_MODE = 1 # Semi-automatic movement AUTO_MODE = 2 # Fully automatic movement # Global values global PBR global lastFrame global lockFrame global camera global processor global running global watchdog global movementMode running = True movementMode = MANUAL_MODE # Setup the UltraBorg global UB UB = UltraBorg.UltraBorg() # Create a new UltraBorg object UB.Init() # Set the board up (checks the board is connected) #======================================================================== wiringpi.wiringPiSetup() # Setup software PWMs on the GPIO pins PIN_RED = 0 PIN_GREEN = 2 PIN_BLUE = 3 wiringpi.pinMode(PIN_RED, wiringpi.GPIO.OUTPUT) wiringpi.pinMode(PIN_GREEN, wiringpi.GPIO.OUTPUT) wiringpi.pinMode(PIN_BLUE, wiringpi.GPIO.OUTPUT) # A function to set the LedBorg colours def SetLedBorg(red, green, blue): wiringpi.digitalWrite(PIN_RED, red) wiringpi.digitalWrite(PIN_GREEN, green) wiringpi.digitalWrite(PIN_BLUE, blue) # A function to turn the LedBorg off def LedBorgOff(): SetLedBorg(0, 0, 0) # Settings for the sequence delay = 1 print 'MetalWeb start up' # Set the LedBorg to red SetLedBorg(1, 0, 0) time.sleep(delay) # Turn the LedBorg off LedBorgOff() print 'LedBorg off' print 'Ready to Play, Punk?' #======================================================================== # Setup the PicoBorg Reverse PBR = PicoBorgRev.PicoBorgRev() #PBR.i2cAddress = 0x44 # Uncomment and change the value if you have changed the board address PBR.Init() if not PBR.foundChip: boards = PicoBorgRev.ScanForPicoBorgReverse() if len(boards) == 0: print 'No PicoBorg Reverse found, check you are attached :)' else: print 'No PicoBorg Reverse at address %02X, but we did find boards:' % (PBR.i2cAddress) for board in boards: print ' %02X (%d)' % (board, board) print 'If you need to change the I²C address change the setup line so it is correct, e.g.' print 'PBR.i2cAddress = 0x%02X' % (boards[0]) sys.exit() #PBR.SetEpoIgnore(True) # Uncomment to disable EPO latch, needed if you do not have a switch / jumper PBR.SetCommsFailsafe(False) # Disable the communications failsafe PBR.ResetEpo() # Power settings voltageIn = 1.2 * 12 # Total battery voltage to the PicoBorg Reverse voltageOut = 12.0 # Maximum motor voltage # Setup the power limits if voltageOut > voltageIn: maxPower = 1.0 else: maxPower = voltageOut / float(voltageIn) # Timeout thread class Watchdog(threading.Thread): def __init__(self): super(Watchdog, self).__init__() self.event = threading.Event() self.terminated = False self.start() self.timestamp = time.time() def run(self): timedOut = True # This method runs in a separate thread while not self.terminated: # Wait for a network event to be flagged for up to one second if timedOut: if self.event.wait(1): # Connection print 'Reconnected...' timedOut = False self.event.clear() else: if self.event.wait(1): self.event.clear() else: # Timed out print 'Timed out...' timedOut = True PBR.MotorsOff() # Image stream processing thread class StreamProcessor(threading.Thread): def __init__(self): super(StreamProcessor, self).__init__() self.stream = picamera.array.PiRGBArray(camera) self.event = threading.Event() self.terminated = False self.start() self.begin = 0 def run(self): global lastFrame global lockFrame # This method runs in a separate thread while not self.terminated: # Wait for an image to be written to the stream if self.event.wait(1): try: # Read the image and save globally self.stream.seek(0) flippedArray = cv2.flip(self.stream.array, -1) # Flips X and Y retval, thisFrame = cv2.imencode('.jpg', flippedArray) del flippedArray lockFrame.acquire() lastFrame = thisFrame lockFrame.release() finally: # Reset the stream and event self.stream.seek(0) self.stream.truncate() self.event.clear() # Image capture thread class ImageCapture(threading.Thread): def __init__(self): super(ImageCapture, self).__init__() self.start() def run(self): global camera global processor print 'Start the stream using the video port' camera.capture_sequence(self.TriggerStream(), format='bgr', use_video_port=True) print 'Terminating camera processing...' processor.terminated = True processor.join() print 'Processing terminated.' # Stream delegation loop def TriggerStream(self): global running while running: if processor.event.is_set(): time.sleep(0.01) else: yield processor.stream processor.event.set() # Automatic movement thread class AutoMovement(threading.Thread): def __init__(self): super(AutoMovement, self).__init__() self.terminated = False self.start() def run(self): global movementMode # This method runs in a separate thread while not self.terminated: # See which mode we are in if movementMode == MANUAL_MODE: # User movement, wait a second before checking again time.sleep(1.0) elif movementMode == SEMI_AUTO_MODE: # A function to turn the LedBorg off def LedBorgOff(): SetLedBorg(0, 0, 0) # Settings for the sequence delay = 0.01 print 'MetalWeb Manual Mode' # Set the LedBorg to green SetLedBorg(0, 1, 0) time.sleep(delay) # Turn the LedBorg off LedBorgOff() # Semi-automatic movement mode, checks twice per sec # Ultrasonic distance readings usm1 = UB.GetRawDistance1() usm2 = UB.GetRawDistance2() usm3 = UB.GetRawDistance3() usm4 = UB.GetRawDistance4() # Convert to the nearest millimeter usm1 = int(usm1) usm2 = int(usm2) usm3 = int(usm3) usm4 = int(usm4) #========================================================================= #========================================================================= # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print 'Compass', heading # Get the accelerometer readings ax,ay,az = XLoBorg.ReadAccelerometer() # Calculate pitch and roll in radians roll = -math.atan2(ax, math.sqrt(pow(ay,2) + pow(az, 2))) pitch = math.atan2(ay, az) # Convert to degrees pitch = pitch * 180.0 / math.pi; roll = roll * 180.0 / math.pi; print 'pitch', pitch print 'roll', roll def get_cpu_temp_c(): tempFile = open( "/sys/class/thermal/thermal_zone0/temp" ) cpu_temp = tempFile.read() tempFile.close() return float(cpu_temp)/1000 print("CPU Temp: %.2f°C" % get_cpu_temp_c()) # Read and display the temperature XLoBorg.tempOffest = 34 print 'XLo Temp: %02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'time :', localtime print'' # Wait for 1/2 of a second before reading again #Thread.sleep(0.5) time.sleep(0.5) #================================================================================ elif movementMode == AUTO_MODE: # Automatic movement mode, updates five times per second # A function to turn the LedBorg off def LedBorgOff(): SetLedBorg(0, 0, 0) # Settings for the sequence delay = 0.01 print 'MetalWeb Manual Mode' # Set the LedBorg to blue SetLedBorg(0, 0, 1) time.sleep(delay) # Turn the LedBorg off LedBorgOff() # Ultrasonic distance readings usm1 = UB.GetRawDistance1() usm2 = UB.GetRawDistance2() usm3 = UB.GetRawDistance3() usm4 = UB.GetRawDistance4() # Convert to the nearest millimeter usm1 = int(usm1) usm2 = int(usm2) usm3 = int(usm3) usm4 = int(usm4) # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print 'Compass', heading # Get the accelerometer readings ax,ay,az = XLoBorg.ReadAccelerometer() # Calculate pitch and roll in radians pitch = -math.atan2(ax, math.sqrt(pow(ay,2) + pow(az, 2))) roll = math.atan2(ay, az) # Convert to degrees pitch = pitch * 180.0 / math.pi; roll = roll * 180.0 / math.pi; print 'pitch', pitch print 'roll', roll def get_cpu_temp_c(): tempFile = open( "/sys/class/thermal/thermal_zone0/temp" ) cpu_temp = tempFile.read() tempFile.close() return float(cpu_temp)/1000 print("CPU Temp: %.2f°C" % get_cpu_temp_c()) # Read and display the temperature XLoBorg.tempOffest = 34 print 'XLo Temp: %02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'time :', localtime print'' # set motors drive fward fast speed 100%---->40% usm1 to controll speed # need some help here :) if usm1 >= 361: driveLeft = 0.5 driveRight = 0.5 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) # set motors drive fward slow if usm1 <= 360: driveLeft = 0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm1 <= 200: driveLeft = -0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm4 <= 360: driveLeft = 0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm4 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors1 drive fward if usm2 <= 200: driveLeft = -0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors2 drive fward if usm3 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors spinn if usm1 <= 200: if usm2 <= 200: if usm3 <= 200: if usm4 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(2.0) #--------------------------------------------- # set motors drive rew if usm1 <= 100: driveLeft = -0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(1.0) driveLeft = -0.4 driveRight = 0.0 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(0.5) if usm4 <= 100: driveLeft = -0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(1.0) driveLeft = 0.0 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(0.5) # Wait for 1/5 of a second before reading again time.sleep(.2) #Thread.sleep(0.2) else: # Unexpected, print an error and wait a second before trying again print 'Unexpected movement mode %d' % (movementMode) time.sleep(1.0) # Class used to implement the web server class WebServer(SocketServer.BaseRequestHandler): def handle(self): global PBR global lastFrame global watchdog global movementMode # Get the HTTP request data reqData = self.request.recv(1024).strip() reqData = reqData.split('\n') # Get the URL requested getPath = '' for line in reqData: if line.startswith('GET'): parts = line.split(' ') getPath = parts[1] break watchdog.event.set() if getPath.startswith('/distances-once'): # Ultrasonic distance readings # Get the readings distance1 = int(UB.GetDistance1()) distance2 = int(UB.GetDistance2()) distance3 = int(UB.GetDistance3()) distance4 = int(UB.GetDistance4()) # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print 'Compass', heading # Get the accelerometer readings ax,ay,az = XLoBorg.ReadAccelerometer() # Calculate pitch and roll in radians roll = -math.atan2(ax, math.sqrt(pow(ay,2) + pow(az, 2))) pitch = math.atan2(ay, az) # Convert to degrees pitch = pitch * 180.0 / math.pi; roll = roll * 180.0 / math.pi; print 'pitch', pitch print 'roll', roll def get_cpu_temp_c(): tempFile = open( "/sys/class/thermal/thermal_zone0/temp" ) cpu_temp = tempFile.read() tempFile.close() return float(cpu_temp)/1000 print("CPU Temp: %.2f°C" % get_cpu_temp_c()) # Read and display the temperature XLoBorg.tempOffest = 34 print 'XLo Temp: %02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'time :', localtime print '' # Build a table for the values httpText = '
' httpText += '' if distance1 == 0: httpText += '' else: httpText += '' % (distance1) if distance2 == 0: httpText += '' else: httpText += '' % (distance2) if distance3 == 0: httpText += '' else: httpText += '' % (distance3) if distance4 == 0: httpText += '' else: httpText += '' % (distance4) httpText += '
None
%04d
None
%04d
None
%04d
None
%04d
' httpText += 'Time: %s
' % (localtime) httpText += 'Compass: %.0f°' % (heading) httpText += ' Pitch: %.0f°' % (pitch) httpText += ' Roll: %.0f°
' % (roll) httpText += 'XLo Temp: %02d°C' % (XLoBorg.ReadTemperature()) httpText += ' CPU Temp: %+02d°C' % (get_cpu_temp_c()) httpText += '
' self.send(httpText) elif getPath.startswith('/semiAuto'): # Toggle Auto mode if movementMode == SEMI_AUTO_MODE: # We are in semi-auto mode, turn it off movementMode = MANUAL_MODE httpText = '
' httpText += 'Speeds: 0 %, 0 %' httpText += '
' self.send(httpText) PBR.MotorsOff() else: # We are not in semi-auto mode, turn it on movementMode = SEMI_AUTO_MODE httpText = '
' httpText += 'Semi Mode' httpText += '
' self.send(httpText) elif getPath.startswith('/Auto'): # Toggle Auto mode if movementMode == AUTO_MODE: # We are in auto mode, turn it off movementMode = MANUAL_MODE httpText = '
' httpText += 'Speeds: 0 %, 0 %' httpText += '
' self.send(httpText) PBR.MotorsOff() else: # We are not in auto mode, turn it on movementMode = AUTO_MODE httpText = '
' httpText += 'Auto Mode' httpText += '
' self.send(httpText) elif getPath.startswith('/cam.jpg'): # Camera snapshot lockFrame.acquire() sendFrame = lastFrame lockFrame.release() if sendFrame != None: self.send(sendFrame.tostring()) elif getPath.startswith('/off'): # Turn the drives off and switch to manual mode movementMode = MANUAL_MODE httpText = '
' httpText += 'Speeds: 0 %, 0 %' httpText += '
' self.send(httpText) PBR.MotorsOff() elif getPath.startswith('/set/'): # Motor power setting: /set/driveLeft/driveRight parts = getPath.split('/') # A function to turn the LedBorg off def LedBorgOff(): SetLedBorg(0, 0, 0) # Settings for the sequence delay = 0.01 print 'MetalWeb Manual Mode' # Set the LedBorg to yellow SetLedBorg(1, 1, 0) time.sleep(delay) # Turn the LedBorg off LedBorgOff() # Get the power levels if len(parts) >= 4: try: driveLeft = float(parts[2]) driveRight = float(parts[3]) except: # Bad values driveRight = 0.0 driveLeft = 0.0 else: # Bad request driveRight = 0.0 driveLeft = 0.0 # Ensure settings are within limits if driveRight < -1: driveRight = -1 elif driveRight > 1: driveRight = 1 if driveLeft < -1: driveLeft = -1 elif driveLeft > 1: driveLeft = 1 # Report the current settings percentLeft = driveLeft * 100.0; percentRight = driveRight * 100.0; httpText = '
' if movementMode == MANUAL_MODE: httpText += 'Speeds: %.0f %%, %.0f %%' % (percentLeft, percentRight) elif movementMode == SEMI_AUTO_MODE: httpText += 'Semi: %.0f %%, %.0f %%' % (percentLeft, percentRight) elif movementMode == AUTO_MODE: percentLeft = PBR.GetMotor2() * 100.0; percentRight = PBR.GetMotor1() * 100.0; httpText += 'Auto: %.0f %%, %.0f %%' % (percentLeft, percentRight) httpText += '
' self.send(httpText) # Set the outputs as long as we are not in auto mode if movementMode != AUTO_MODE: driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) elif getPath.startswith('/photo'): # Save camera photo lockFrame.acquire() captureFrame = lastFrame lockFrame.release() httpText = '
' if captureFrame != None: photoName = '%s/Photo %s.jpg' % (photoDirectory, datetime.datetime.utcnow()) try: photoFile = open(photoName, 'wb') photoFile.write(captureFrame) photoFile.close() httpText += 'Photo saved to %s' % (photoName) except: httpText += 'Failed to take photo!' else: httpText += 'Failed to take photo!' httpText += '
' self.send(httpText) elif getPath == '/': # Main page, click buttons to move and to stop httpText = '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '
\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '

\n' httpText += '\n' httpText += '
\n' httpText += '
Distances (mm)
\n' httpText += '\n' httpText += '\n' httpText += '\n' self.send(httpText) elif getPath == '/stream': # Streaming frame, set a delayed refresh displayDelay = int(1000 / displayRate) httpText = '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '\n' % (displayDelay) httpText += '
\n' httpText += '\n' httpText += '\n' self.send(httpText) elif getPath == '/distances': # Repeated reading of the ultrasonic distances, set a delayed refresh # We use AJAX to avoid screen refreshes caused by refreshing a frame displayDelay = int(1000 / displayRate) httpText = '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '\n' httpText += '\n' % (displayDelay) httpText += '
Waiting for first distance reading...
\n' httpText += '\n' httpText += '\n' self.send(httpText) else: # Unexpected page self.send('Path : "%s"' % (getPath)) def send(self, content): self.request.sendall('HTTP/1.0 200 OK\n\n%s' % (content)) # Create the image buffer frame lastFrame = None lockFrame = threading.Lock() # Startup sequence print 'Setup camera' camera = picamera.PiCamera() camera.resolution = (imageWidth, imageHeight) camera.framerate = frameRate print 'Setup the stream processing thread' processor = StreamProcessor() print 'Wait ...' time.sleep(2) captureThread = ImageCapture() print 'Setup the watchdog' watchdog = Watchdog() print 'Setup the automatic movement' autoMovement = AutoMovement() # Run the web server until we are told to close httpServer = SocketServer.TCPServer(("0.0.0.0", webPort), WebServer) try: print 'Press CTRL+C to terminate the web-server' while running: httpServer.handle_request() except KeyboardInterrupt: # CTRL+C exit print '\nUser shutdown' finally: # Turn the motors off under all scenarios PBR.MotorsOff() print 'Motors off' # Tell each thread to stop, and wait for them to end running = False captureThread.join() processor.terminated = True watchdog.terminated = True autoMovement.terminated = True processor.join() watchdog.join() autoMovement.join() del camera PBR.SetLed(True) print 'Web-server terminated.'