#!/usr/bin/env python # coding: Latin-1 # Creates a web-page interface for DiddyBorg Metal Edition # Import library functions we need import PicoBorgRev import time import sys import threading import SocketServer import picamera import picamera.array import cv2 import UltraBorg import datetime import math import XLoBorg import wiringpi2 as wiringpi #======================================================================= # Tell the library to disable diagnostic printouts XLoBorg.printFunction = XLoBorg.NoPrint # Start the XLoBorg module (sets up devices) XLoBorg.Init() #======================================================================== wiringpi.wiringPiSetup() #======================================================================== # Settings for the web-page webPort = 80 # Port number for the web-page, 80 is what web-pages normally use imageWidth = 240 # Width of the captured image in pixels imageHeight = 180 # Height of the captured image in pixels frameRate = 10 # Number of images to capture per second displayRate = 2 # Number of images to request per second photoDirectory = '/home/pi' # Directory to save photos to # Movement mode constants MANUAL_MODE = 0 # User controlled movement SEMI_AUTO_MODE = 1 # Semi-automatic movement AUTO_MODE = 2 # Fully automatic movement # Global values global PBR global lastFrame global lockFrame global camera global processor global running global watchdog global movementMode running = True movementMode = MANUAL_MODE # Setup the UltraBorg global UB UB = UltraBorg.UltraBorg() # Create a new UltraBorg object UB.Init() # Set the board up (checks the board is connected) # Setup the PicoBorg Reverse PBR = PicoBorgRev.PicoBorgRev() #PBR.i2cAddress = 0x44 # Uncomment and change the value if you have changed the board address PBR.Init() if not PBR.foundChip: boards = PicoBorgRev.ScanForPicoBorgReverse() if len(boards) == 0: print 'No PicoBorg Reverse found, check you are attached :)' else: print 'No PicoBorg Reverse at address %02X, but we did find boards:' % (PBR.i2cAddress) for board in boards: print ' %02X (%d)' % (board, board) print 'If you need to change the I²C address change the setup line so it is correct, e.g.' print 'PBR.i2cAddress = 0x%02X' % (boards[0]) sys.exit() #PBR.SetEpoIgnore(True) # Uncomment to disable EPO latch, needed if you do not have a switch / jumper PBR.SetCommsFailsafe(False) # Disable the communications failsafe PBR.ResetEpo() # Power settings voltageIn = 1.2 * 12 # Total battery voltage to the PicoBorg Reverse voltageOut = 12.0 # Maximum motor voltage # Setup the power limits if voltageOut > voltageIn: maxPower = 1.0 else: maxPower = voltageOut / float(voltageIn) # Timeout thread class Watchdog(threading.Thread): def __init__(self): super(Watchdog, self).__init__() self.event = threading.Event() self.terminated = False self.start() self.timestamp = time.time() def run(self): timedOut = True # This method runs in a separate thread while not self.terminated: # Wait for a network event to be flagged for up to one second if timedOut: if self.event.wait(1): # Connection print 'Reconnected...' timedOut = False self.event.clear() else: if self.event.wait(1): self.event.clear() else: # Timed out print 'Timed out...' timedOut = True PBR.MotorsOff() # Image stream processing thread class StreamProcessor(threading.Thread): def __init__(self): super(StreamProcessor, self).__init__() self.stream = picamera.array.PiRGBArray(camera) self.event = threading.Event() self.terminated = False self.start() self.begin = 0 def run(self): global lastFrame global lockFrame # This method runs in a separate thread while not self.terminated: # Wait for an image to be written to the stream if self.event.wait(1): try: # Read the image and save globally self.stream.seek(0) flippedArray = cv2.flip(self.stream.array, -1) # Flips X and Y retval, thisFrame = cv2.imencode('.jpg', flippedArray) del flippedArray lockFrame.acquire() lastFrame = thisFrame lockFrame.release() finally: # Reset the stream and event self.stream.seek(0) self.stream.truncate() self.event.clear() # Image capture thread class ImageCapture(threading.Thread): def __init__(self): super(ImageCapture, self).__init__() self.start() def run(self): global camera global processor print 'Start the stream using the video port' camera.capture_sequence(self.TriggerStream(), format='bgr', use_video_port=True) print 'Terminating camera processing...' processor.terminated = True processor.join() print 'Processing terminated.' # Stream delegation loop def TriggerStream(self): global running while running: if processor.event.is_set(): time.sleep(0.01) else: yield processor.stream processor.event.set() # Automatic movement thread class AutoMovement(threading.Thread): def __init__(self): super(AutoMovement, self).__init__() self.terminated = False self.start() def run(self): global movementMode # This method runs in a separate thread while not self.terminated: # See which mode we are in if movementMode == MANUAL_MODE: # User movement, wait a second before checking again time.sleep(1.0) elif movementMode == SEMI_AUTO_MODE: # Semi-automatic movement mode, checks twice per sec # Ultrasonic distance readings usm1 = UB.GetRawDistance1() usm2 = UB.GetRawDistance2() usm3 = UB.GetRawDistance3() usm4 = UB.GetRawDistance4() # Convert to the nearest millimeter usm1 = int(usm1) usm2 = int(usm2) usm3 = int(usm3) usm4 = int(usm4) # TODO: Fill in logic here # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print heading # Read and display the temperature print '%+02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'local current time :', localtime # Wait for 1/2 of a second before reading again #Thread.sleep(0.5) time.sleep(0.5) elif movementMode == AUTO_MODE: # Automatic movement mode, updates five times per second # Ultrasonic distance readings usm1 = UB.GetRawDistance1() usm2 = UB.GetRawDistance2() usm3 = UB.GetRawDistance3() usm4 = UB.GetRawDistance4() # Convert to the nearest millimeter usm1 = int(usm1) usm2 = int(usm2) usm3 = int(usm3) usm4 = int(usm4) # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print heading # Read and display the temperature print '%+02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'local current time :', localtime # set motors drive fward fast speed 100%---->40% usm1 to controll speed # need some help here :) if usm1 >= 361: driveLeft = 0.6 driveRight = 0.6 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) # set motors drive fward slow if usm1 <= 360: driveLeft = 0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm1 <= 200: driveLeft = -0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm4 <= 360: driveLeft = 0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) if usm4 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors1 drive fward if usm2 <= 200: driveLeft = -0.4 driveRight = 0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors2 drive fward if usm3 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) #--------------------------------------------- # set motors spinn if usm1 <= 200: if usm2 <= 200: if usm3 <= 200: if usm4 <= 200: driveLeft = 0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(2.0) #--------------------------------------------- # set motors drive rew if usm1 <= 100: driveLeft = -0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(1.0) driveLeft = -0.4 driveRight = 0.0 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(0.5) if usm4 <= 100: driveLeft = -0.4 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(1.0) driveLeft = 0.0 driveRight = -0.4 driveLeft *= maxPower driveRight *= maxPower PBR.SetMotor1(driveRight) PBR.SetMotor2(-driveLeft) time.sleep(0.5) # Wait for 1/5 of a second before reading again time.sleep(.2) #Thread.sleep(0.2) else: # Unexpected, print an error and wait a second before trying again print 'Unexpected movement mode %d' % (movementMode) time.sleep(1.0) # Class used to implement the web server class WebServer(SocketServer.BaseRequestHandler): def handle(self): global PBR global lastFrame global watchdog global movementMode # Get the HTTP request data reqData = self.request.recv(1024).strip() reqData = reqData.split('\n') # Get the URL requested getPath = '' for line in reqData: if line.startswith('GET'): parts = line.split(' ') getPath = parts[1] break watchdog.event.set() if getPath.startswith('/distances-once'): # Ultrasonic distance readings # Get the readings distance1 = int(UB.GetDistance1()) distance2 = int(UB.GetDistance2()) distance3 = int(UB.GetDistance3()) distance4 = int(UB.GetDistance4()) # Read and display the raw magnetometer readings mx,my,mz = XLoBorg.ReadCompassRaw() print 'mX = %+06d, mY = %+06d, mZ = %+06d' % XLoBorg.ReadCompassRaw() # get the heading in radians heading = math.atan2 (my,mx) if (heading < 0): heading = heading + (2 * math.pi) # convert to degrees heading = heading * 180/math.pi; print heading # Read and display the temperature print '%+02d°C' % (XLoBorg.ReadTemperature()) # Read and display time localtime = time.asctime( time.localtime(time.time()) ) print 'local current time :', localtime # Build a table for the values httpText = '