#!/usr/bin/env python # coding: Latin-1 # Creates a web-page interface for DiddyBorg Metal Edition # Import library functions we need import PicoBorgRev import time import sys import threading import SocketServer import picamera import picamera.array import cv2 import UltraBorg import datetime import math import XLoBorg import wiringpi2 as wiringpi #======================================================================= # Tell the library to disable diagnostic printouts XLoBorg.printFunction = XLoBorg.NoPrint # Start the XLoBorg module (sets up devices) XLoBorg.Init() # Settings for the web-page webPort = 80 # Port number for the web-page, 80 is what web-pages normally use imageWidth = 240 # Width of the captured image in pixels imageHeight = 180 # Height of the captured image in pixels frameRate = 10 # Number of images to capture per second displayRate = 2 # Number of images to request per second photoDirectory = '/home/pi' # Directory to save photos to # Movement mode constants MANUAL_MODE = 0 # User controlled movement SEMI_AUTO_MODE = 1 # Semi-automatic movement AUTO_MODE = 2 # Fully automatic movement # Global values global PBR global lastFrame global lockFrame global camera global processor global running global watchdog global movementMode running = True movementMode = MANUAL_MODE # Setup the UltraBorg global UB UB = UltraBorg.UltraBorg() # Create a new UltraBorg object UB.Init() # Set the board up (checks the board is connected) #======================================================================== wiringpi.wiringPiSetup() # Setup software PWMs on the GPIO pins PIN_RED = 0 PIN_GREEN = 2 PIN_BLUE = 3 wiringpi.pinMode(PIN_RED, wiringpi.GPIO.OUTPUT) wiringpi.pinMode(PIN_GREEN, wiringpi.GPIO.OUTPUT) wiringpi.pinMode(PIN_BLUE, wiringpi.GPIO.OUTPUT) # A function to set the LedBorg colours def SetLedBorg(red, green, blue): wiringpi.digitalWrite(PIN_RED, red) wiringpi.digitalWrite(PIN_GREEN, green) wiringpi.digitalWrite(PIN_BLUE, blue) # A function to turn the LedBorg off def LedBorgOff(): SetLedBorg(0, 0, 0) # Settings for the sequence delay = 1 print 'MetalWeb start up' # Set the LedBorg to red SetLedBorg(1, 0, 0) time.sleep(delay) # Turn the LedBorg off LedBorgOff() print 'LedBorg off' print 'Ready to Play, Punk?' #======================================================================== # Setup the PicoBorg Reverse PBR = PicoBorgRev.PicoBorgRev() #PBR.i2cAddress = 0x44 # Uncomment and change the value if you have changed the board address PBR.Init() if not PBR.foundChip: boards = PicoBorgRev.ScanForPicoBorgReverse() if len(boards) == 0: print 'No PicoBorg Reverse found, check you are attached :)' else: print 'No PicoBorg Reverse at address %02X, but we did find boards:' % (PBR.i2cAddress) for board in boards: print ' %02X (%d)' % (board, board) print 'If you need to change the I²C address change the setup line so it is correct, e.g.' print 'PBR.i2cAddress = 0x%02X' % (boards[0]) sys.exit() #PBR.SetEpoIgnore(True) # Uncomment to disable EPO latch, needed if you do not have a switch / jumper PBR.SetCommsFailsafe(False) # Disable the communications failsafe PBR.ResetEpo() # Power settings voltageIn = 1.2 * 12 # Total battery voltage to the PicoBorg Reverse voltageOut = 12.0 # Maximum motor voltage # Setup the power limits if voltageOut > voltageIn: maxPower = 1.0 else: maxPower = voltageOut / float(voltageIn) # Timeout thread class Watchdog(threading.Thread): def __init__(self): super(Watchdog, self).__init__() self.event = threading.Event() self.terminated = False self.start() self.timestamp = time.time() def run(self): timedOut = True # This method runs in a separate thread while not self.terminated: # Wait for a network event to be flagged for up to one second if timedOut: if self.event.wait(1): # Connection print 'Reconnected...' timedOut = False self.event.clear() else: if self.event.wait(1): self.event.clear() else: # Timed out print 'Timed out...' timedOut = True PBR.MotorsOff() # Image stream processing thread class StreamProcessor(threading.Thread): def __init__(self): super(StreamProcessor, self).__init__() self.stream = picamera.array.PiRGBArray(camera) self.event = threading.Event() self.terminated = False self.start() self.begin = 0 def run(self): global lastFrame global lockFrame # This method runs in a separate thread while not self.terminated: # Wait for an image to be written to the stream if self.event.wait(1): try: # Read the image and save globally self.stream.seek(0) flippedArray = cv2.flip(self.stream.array, -1) # Flips X and Y retval, thisFrame = cv2.imencode('.jpg', flippedArray) del flippedArray lockFrame.acquire() lastFrame = thisFrame lockFrame.release() finally: # Reset the stream and event self.stream.seek(0) self.stream.truncate() self.event.clear() # Image capture thread class ImageCapture(threading.Thread): def __init__(self): super(ImageCapture, self).__init__() self.start() def run(self): global camera global processor print 'Start the stream using the video port' camera.capture_sequence(self.TriggerStream(), format='bgr', use_video_port=True) print 'Terminating camera processing...' processor.terminated = True processor.join() print 'Processing terminated.' # Stream delegation loop def TriggerStream(self): global running while running: if processor.event.is_set(): time.sleep(0.01) else: yield processor.stream processor.event.set() # Automatic movement thread class AutoMovement(threading.Thread): def __init__(self): super(AutoMovement, self).__init__() self.terminated = False self.start() def run(self): global movementMode # This method runs in a separate thread while not self.terminated: # See which mode we are in if movementMode == MANUAL_MODE: # User movement, wait a second before checking again time.sleep(1.0) elif movementMode == SEMI_AUTO_MODE: # A function to turn the LedBorg off def LedBorgOff(): SetLedBorg(0, 0, 0) # Settings for the sequence delay = 0.01 print 'MetalWeb Manual Mode' # Set the LedBorg to green SetLedBorg(0, 1, 0) time.sleep(delay) # Turn the LedBorg off LedBorgOff() # Semi-automatic movement mode, checks twice per sec # Ultrasonic distance readings usm1 = UB.GetRawDistance1() usm2 = UB.GetRawDistance2() usm3 = UB.GetRawDistance3() usm4 = UB.GetRawDistance4() # Convert to the nearest millimeter usm1 = int(usm1) usm2 = int(usm2) usm3 = int(usm3) usm4 = int(usm4) #========================================================================= #========================================================================= # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print 'Compass', heading # Get the accelerometer readings ax,ay,az = XLoBorg.ReadAccelerometer() # Calculate pitch and roll in radians roll = -math.atan2(ax, math.sqrt(pow(ay,2) + pow(az, 2))) pitch = math.atan2(ay, az) # Convert to degrees pitch = pitch * 180.0 / math.pi; roll = roll * 180.0 / math.pi; print 'pitch', pitch print 'roll', roll def get_cpu_temp_c(): tempFile = open( "/sys/class/thermal/thermal_zone0/temp" ) cpu_temp = tempFile.read() tempFile.close() return float(cpu_temp)/1000 print("CPU Temp: %.2f°C" % get_cpu_temp_c()) # Read and display the temperature XLoBorg.tempOffest = 34 print 'XLo Temp: %02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'time :', localtime print'' # Wait for 1/2 of a second before reading again #Thread.sleep(0.5) time.sleep(0.5) #================================================================================ elif movementMode == AUTO_MODE: # Automatic movement mode, updates five times per second # A function to turn the LedBorg off def LedBorgOff(): SetLedBorg(0, 0, 0) # Settings for the sequence delay = 0.01 print 'MetalWeb Manual Mode' # Set the LedBorg to blue SetLedBorg(0, 0, 1) time.sleep(delay) # Turn the LedBorg off LedBorgOff() # Ultrasonic distance readings usm1 = UB.GetRawDistance1() usm2 = UB.GetRawDistance2() usm3 = UB.GetRawDistance3() usm4 = UB.GetRawDistance4() # Convert to the nearest millimeter usm1 = int(usm1) usm2 = int(usm2) usm3 = int(usm3) usm4 = int(usm4) # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print 'Compass', heading # Get the accelerometer readings ax,ay,az = XLoBorg.ReadAccelerometer() # Calculate pitch and roll in radians pitch = -math.atan2(ax, math.sqrt(pow(ay,2) + pow(az, 2))) roll = math.atan2(ay, az) # Convert to degrees pitch = pitch * 180.0 / math.pi; roll = roll * 180.0 / math.pi; print 'pitch', pitch print 'roll', roll def get_cpu_temp_c(): tempFile = open( "/sys/class/thermal/thermal_zone0/temp" ) cpu_temp = tempFile.read() tempFile.close() return float(cpu_temp)/1000 print("CPU Temp: %.2f°C" % get_cpu_temp_c()) # Read and display the temperature XLoBorg.tempOffest = 34 print 'XLo Temp: %02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'time :', localtime print'' # set motors drive fward fast speed 100%---->40% usm1 to controll speed # need some help here :) if usm1 >= 361: driveLeft = 0.5 driveRight = 0.5 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) # set motors drive fward slow if usm1 <= 360: driveLeft = 0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm1 <= 200: driveLeft = -0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm4 <= 360: driveLeft = 0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm4 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors1 drive fward if usm2 <= 200: driveLeft = -0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors2 drive fward if usm3 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors spinn if usm1 <= 200: if usm2 <= 200: if usm3 <= 200: if usm4 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(2.0) #--------------------------------------------- # set motors drive rew if usm1 <= 100: driveLeft = -0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(1.0) driveLeft = -0.4 driveRight = 0.0 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(0.5) if usm4 <= 100: driveLeft = -0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(1.0) driveLeft = 0.0 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(0.5) # Wait for 1/5 of a second before reading again time.sleep(.2) #Thread.sleep(0.2) else: # Unexpected, print an error and wait a second before trying again print 'Unexpected movement mode %d' % (movementMode) time.sleep(1.0) # Class used to implement the web server class WebServer(SocketServer.BaseRequestHandler): def handle(self): global PBR global lastFrame global watchdog global movementMode # Get the HTTP request data reqData = self.request.recv(1024).strip() reqData = reqData.split('\n') # Get the URL requested getPath = '' for line in reqData: if line.startswith('GET'): parts = line.split(' ') getPath = parts[1] break watchdog.event.set() if getPath.startswith('/distances-once'): # Ultrasonic distance readings # Get the readings distance1 = int(UB.GetDistance1()) distance2 = int(UB.GetDistance2()) distance3 = int(UB.GetDistance3()) distance4 = int(UB.GetDistance4()) # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print 'Compass', heading # Get the accelerometer readings ax,ay,az = XLoBorg.ReadAccelerometer() # Calculate pitch and roll in radians roll = -math.atan2(ax, math.sqrt(pow(ay,2) + pow(az, 2))) pitch = math.atan2(ay, az) # Convert to degrees pitch = pitch * 180.0 / math.pi; roll = roll * 180.0 / math.pi; print 'pitch', pitch print 'roll', roll def get_cpu_temp_c(): tempFile = open( "/sys/class/thermal/thermal_zone0/temp" ) cpu_temp = tempFile.read() tempFile.close() return float(cpu_temp)/1000 print("CPU Temp: %.2f°C" % get_cpu_temp_c()) # Read and display the temperature XLoBorg.tempOffest = 34 print 'XLo Temp: %02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'time :', localtime print '' # Build a table for the values httpText = '