Source code for paradrop.core.update.update_fetcher

'''
Fetch new updates from the pdserver and apply the updates
'''
from __future__ import print_function

from twisted.internet import task
from twisted.internet.defer import inlineCallbacks

from paradrop.core.agent.http import PDServerRequest
from paradrop.core.auth.user import User
from paradrop.base.output import out
from paradrop.base.pdutils import timeint


# Interval (in seconds) for polling the server for updates.  This is a fallback
# in case the notifications over WAMP fail, so it can be relatively infrequent.
#
# The automatic polling does not start until after pull_update()
# has been called at least once.
UPDATE_POLL_INTERVAL = 15 * 60


[docs]class UpdateFetcher(object): def __init__(self, update_manager): # Store set of update IDs that are in progress in order to avoid # repeats. self.updates_in_progress = set() self.update_manager = update_manager self.scheduled_call = task.LoopingCall(self.pull_update, _auto=True) self.long_poll_started = False self.use_long_poll = True
[docs] @inlineCallbacks def start_long_poll(self): self.long_poll_started = True while self.use_long_poll: request = PDServerRequest('/api/routers/{router_id}/updates/poll') try: response = yield request.get() except Exception: pass else: # 200 = update(s) available # 204 = no updates yet # 404 = server does not support this endpoint if response.code == 200: self.pull_update(_auto=False) elif response.code == 404: self.use_long_poll = False
[docs] def start_polling(self): # Long polling is a more recent addition. If the server supports it, it # will only respond to our GET request after an update is available or a # timeout has expired. if not self.long_poll_started: self.start_long_poll() # Basic polling is our baseline. It always works with no dependency on # maintaining websocket connections, etc. Set the polling interval to # something reasonable, e.g. 15 minutes to 1 hour. if not self.scheduled_call.running: self.scheduled_call.start(UPDATE_POLL_INTERVAL, now=False)
[docs] @inlineCallbacks def pull_update(self, _auto=False): """ Start updates by polling the server for the latest updates. This is the only method that needs to be called from outside. The rest are triggered asynchronously. Call chain: pull_update -> _updates_received -> _update_complete _auto: Set to True when called by the scheduled LoopingCall. """ # If this call was triggered externally (_auto=False), then reset the # timer for the next scheduled call so we do not poll again too soon. if not _auto and self.scheduled_call.running: self.scheduled_call.reset() def handle_error(error): print("Request for updates failed: {}".format(error.getErrorMessage())) request = PDServerRequest('/api/routers/{router_id}/updates') d = request.get(completed=False) d.addCallback(self._updates_received) d.addErrback(handle_error) yield d
@inlineCallbacks def _updates_received(self, response): if not response.success or response.data is None: print("There was an error receiving updates from the server.") return updates = response.data print("Received {} update(s) from server.".format(len(updates))) # Remove old updates from the set of in progress updates. These are # updates that the server is no longer sending to us. received_set = set(item['_id'] for item in updates) old_updates = self.updates_in_progress - received_set self.updates_in_progress -= old_updates for item in updates: # TODO: Handle updates with started=True very carefully. # # There are at least two interesting cases: # 1. We started an update but crashed or rebooted without # completing it. # 2. We started an upgrade to paradrop, the daemon was just # restarted, but pdinstall has not reported that the update is # complete yet. # # If we skip updates with started=True, we will not retry updates # in case #1. if item['_id'] in self.updates_in_progress: continue elif item.get('started', False): yield self._update_ignored(item) continue else: self.updates_in_progress.add(item['_id']) creator = item.get('creator', {}) username = creator.get("name", "paradrop") role = creator.get("access_level", "trusted") update = dict(updateClass=item['updateClass'], updateType=item['updateType'], user=User(username, "paradrop.org", role=role), tok=timeint()) # Save fields that relate to external management of this chute. update['external'] = { 'update_id': item['_id'], 'chute_id': item.get('chute_id', None), 'version_id': item.get('version_id', None) } # Backend might send us garbage; be sure that we only accept the # config field if it is present and is a dict. config = item.get('config', {}) if isinstance(config, dict): update.update(config) update = yield self.update_manager.add_update(**update) yield self._update_complete(update) def _update_complete(self, update): """ Internal: callback after an update operation has been completed (successfuly or otherwise) and send a notification to the server. """ # If delegated to an external program, we do not report to pdserver # that the update is complete. if update.delegated: return out.info("The update is done, report it to server...") update_id = update.external['update_id'] success = update.result.get('success', False) request = PDServerRequest('/api/routers/{router_id}/updates/' + str(update_id)) d = request.patch( {'op': 'replace', 'path': '/completed', 'value': True}, {'op': 'replace', 'path': '/success', 'value': success} ) return d # TODO: If this notification fails to go through, we should retry or # build in some other mechanism to inform the server. # # Not catching errors here so we see a stack trace if there is an # error. This is an omission that will need to be dealt with. def _update_ignored(self, update): """ Internal: callback for an update that we are ignoring because it was started previously and never completed. """ update_id = update['_id'] request = PDServerRequest('/api/routers/{router_id}/updates/' + str(update_id)) d = request.patch( {'op': 'replace', 'path': '/completed', 'value': True}, {'op': 'replace', 'path': '/success', 'value': False} ) return d