Source code for paradrop.backend.fc.configurer
###################################################################
# Copyright 2013-2014 All Rights Reserved
# Authors: The Paradrop Team
###################################################################
import time
import threading
from pdtools.lib.output import out
from pdtools.lib.pdutils import timeint, str2json
from paradrop.lib import settings
from paradrop.lib.utils.restart import reloadChutes
from . import updateObject
[docs]class PDConfigurer:
"""
ParaDropConfigurer class.
This class is in charge of making the configuration changes required on the chutes.
It utilizes the ChuteStorage class to hold onto the chute data.
Use @updateChutes to make the configuration changes on the AP.
This function is thread-safe, this class will only call one update set at a time.
All others are held in a queue until the last update is complete.
"""
def __init__(self, storage, lclReactor):
self.storage = storage
self.reactor = lclReactor
self.updateLock = threading.Lock()
self.updateQueue = []
###########################################################################################
# Launch the first update call, NOTE that you have to use callInThread!!
# This happens because the performUpdates should run in its own thread,
# it makes blocking calls and such... so if we *don't* use callInThread
# then this function WILL BLOCK THE MAIN EVENT LOOP (ie. you cannot send any data)
###########################################################################################
self.reactor.callInThread(self.performUpdates)
[docs] def getNextUpdate(self):
"""MUTEX: updateLock
Returns the size of the local update queue.
"""
self.updateLock.acquire()
if(len(self.updateQueue) > 0):
# Get first available
a = self.updateQueue.pop(0)
else:
a = None
self.updateLock.release()
return a
[docs] def clearUpdateList(self):
"""MUTEX: updateLock
Clears all updates from list (new array).
"""
self.updateLock.acquire()
self.updateQueue = []
self.updateLock.release()
[docs] def updateList(self, **updateObj):
"""MUTEX: updateLock
Take the list of Chutes and push the list into a queue object, this object will then call
the real update function in another thread so the function that called us is not blocked.
We take a callable responseFunction to call, when we are done with this update we should call it."""
self.updateLock.acquire()
# Push the data into our update queue
self.updateQueue.append(updateObj)
self.updateLock.release()
[docs] def performUpdates(self):
"""This is the main working function of the PDConfigurer class.
It should be executed as a separate thread, it does the following:
checks for any updates to perform
does them
responds to the server
removes the update
checks for more updates
if more exist it calls itself again more quickly
else it puts itself to sleep for a little while
"""
#add any chutes that should already be running to the front of the update queue before processing any updates
startQueue = reloadChutes()
self.updateLock.acquire()
# insert the data into the front of our update queue so that all old chutes restart befor new ones are processed
for updateObj in startQueue:
self.updateQueue.insert(0, updateObj)
self.updateLock.release()
# Always perform this work
while(self.reactor.running):
# Check for new updates
updateObj = self.getNextUpdate()
if(updateObj is None):
time.sleep(1)
continue
try:
# Take the object and identify the update type
update = updateObject.parse(updateObj)
out.info('Performing update %s\n' % (update))
# TESTING start
if(settings.FC_BOUNCE_UPDATE):
out.testing('Bouncing update %s, result: %s\n' % (
update, settings.FC_BOUNCE_UPDATE))
update.complete(success=True, message=settings.FC_BOUNCE_UPDATE)
continue
# TESTING end
# Based on each update type execute could be different
update.execute()
except Exception as e:
out.exception(e, True)