#
# This module collects information about the current state of the system
# (chutes installed, hardware available, etc.) and reports that to the Paradrop
# server for remote management purposes.
#
import json
import os
import time
from twisted.internet import reactor
from paradrop.base.output import out
from paradrop.base import nexus, settings
from paradrop.core.chute.chute_storage import ChuteStorage
from paradrop.core.config import devices, hostconfig, resource, zerotier
from paradrop.core.container.chutecontainer import ChuteContainer
from paradrop.core.agent.http import PDServerRequest
from paradrop.core.system import system_info
from paradrop.core.system.system_status import SystemStatus
from paradrop.lib.misc.governor import GovernorClient
[docs]class StateReport(object):
def __init__(self):
# Record timestamp when report was created in case the server receives
# multiple.
self.timestamp = time.time()
self.name = None
self.osVersion = None
self.paradropVersion = None
self.pdinstallVersion = None
self.chutes = []
self.devices = []
self.hostConfig = {}
self.snaps = []
self.zerotierAddress = None
self.dmi = {}
self.status = {}
[docs] def toJSON(self):
return json.dumps(self.__dict__)
[docs]class StateReportBuilder(object):
[docs] def prepare(self):
report = StateReport()
report.name = nexus.core.info.pdid
report.osVersion = system_info.getOSVersion()
# We can get the paradrop version from the installed python package.
report.paradropVersion = system_info.getPackageVersion('paradrop')
report.chutes = []
chuteStore = ChuteStorage()
chutes = chuteStore.getChuteList()
allocation = resource.computeResourceAllocation(chutes)
for chute in chutes:
service_info = {}
for service in chute.get_services():
container_name = service.get_container_name()
container = ChuteContainer(container_name)
service_info[service.name] = {
'allocation': allocation.get(container_name, None),
'state': container.getStatus()
}
# Use the default service (e.g. "main") to report the chute's
# current state.
service = chute.get_default_service()
container = ChuteContainer(service.get_container_name())
report.chutes.append({
'name': chute.name,
'desired': chute.state,
'state': container.getStatus(),
'services': service_info,
'warning': None,
'version': getattr(chute, 'version', None),
'allocation': None, # deprecated
'environment': getattr(chute, 'environment', None),
'external': getattr(chute, 'external', None),
'resources': getattr(chute, 'resources', None)
})
report.devices = devices.listSystemDevices()
report.hostConfig = hostconfig.prepareHostConfig(write=False)
if GovernorClient.isAvailable():
client = GovernorClient()
report.snaps = client.listSnaps()
report.zerotierAddress = zerotier.getAddress()
report.dmi = system_info.getDMI()
# Add CPU, memory, disk, and network interface information. This gives
# the controller useful debugging information such as high memory or
# disk utilization and IP addresses.
status_source = SystemStatus()
report.status = status_source.getStatus(max_age=None)
return report.__dict__
[docs]class TelemetryReportBuilder(object):
[docs] def prepare(self):
chuteStore = ChuteStorage()
chutes = chuteStore.getChuteList()
# All network interfaces: we will divide these into chute-specific
# interfaces and system-wide interfaces.
network = SystemStatus.getNetworkInfo()
system_interfaces = set(network.keys())
report = {
'chutes': [],
'network': [],
'system': SystemStatus.getSystemInfo(),
'time': time.time()
}
for chute in chutes:
container = ChuteContainer(chute.name)
chute_info = {
'name': chute.name,
'state': container.getStatus(),
'network': []
}
try:
pid = container.getPID()
chute_info['process'] = SystemStatus.getProcessInfo(pid)
except Exception:
chute_info['process'] = None
interfaces = chute.getCache('networkInterfaces')
for iface in interfaces:
ifname = iface['externalIntf']
if ifname in network:
ifinfo = network[ifname]
ifinfo['name'] = ifname
ifinfo['type'] = iface.get('type', 'wifi')
chute_info['network'].append(ifinfo)
system_interfaces.remove(ifname)
report['chutes'].append(chute_info)
for ifname in system_interfaces:
ifinfo = network[ifname]
ifinfo['name'] = ifname
ifinfo['type'] = None
report['network'].append(ifinfo)
return report
[docs]class ReportSender(object):
def __init__(self, model="states", max_retries=None):
self.max_retries = max_retries
self.model = model
self.retries = 0
self.retryDelay = 1
self.maxRetryDelay = 300
[docs] def increaseDelay(self):
self.retryDelay *= 2
if self.retryDelay > self.maxRetryDelay:
self.retryDelay = self.maxRetryDelay
[docs] def send(self, report):
request = PDServerRequest('/api/routers/{router_id}/' + self.model)
d = request.post(**report)
# Check for error code and retry.
def cbresponse(response):
if not response.success:
out.warn('{} to {} returned code {}'.format(request.method,
request.url, response.code))
if self.max_retries is None or self.retries < self.max_retries:
reactor.callLater(self.retryDelay, self.send, report)
self.retries += 1
self.increaseDelay()
nexus.core.jwt_valid = False
else:
nexus.core.jwt_valid = True
# Check for connection failures and retry.
def cberror(ignored):
out.warn('{} to {} failed'.format(request.method, request.url))
if self.max_retries is None or self.retries < self.max_retries:
reactor.callLater(self.retryDelay, self.send, report)
self.retries += 1
self.increaseDelay()
nexus.core.jwt_valid = False
d.addCallback(cbresponse)
d.addErrback(cberror)
return d
[docs]class NodeIdentitySender(ReportSender):
[docs] def send(self, report):
request = PDServerRequest('/api/routers/{router_id}')
d = request.patch(*report)
# Check for error code and retry.
def cbresponse(response):
if not response.success:
out.warn('{} to {} returned code {}'.format(request.method,
request.url, response.code))
if self.max_retries is None or self.retries < self.max_retries:
reactor.callLater(self.retryDelay, self.send, report)
self.retries += 1
self.increaseDelay()
nexus.core.jwt_valid = False
else:
nexus.core.jwt_valid = True
# Check for connection failures and retry.
def cberror(ignored):
out.warn('{} to {} failed'.format(request.method, request.url))
if self.max_retries is None or self.retries < self.max_retries:
reactor.callLater(self.retryDelay, self.send, report)
self.retries += 1
self.increaseDelay()
nexus.core.jwt_valid = False
d.addCallback(cbresponse)
d.addErrback(cberror)
return d
[docs]def sendNodeIdentity():
path = os.path.join(settings.KEY_DIR, "node.pub")
with open(path, "r") as source:
public_key = source.read().strip()
report = [
{"op": "add", "path": "/ssh_public_key", "value": public_key}
]
sender = NodeIdentitySender()
return sender.send(report)
[docs]def sendStateReport():
builder = StateReportBuilder()
report = builder.prepare()
sender = ReportSender()
return sender.send(report)
[docs]def sendTelemetryReport():
# Do not try to send telemetry report if not provisioned.
if not nexus.core.provisioned():
if not getattr(sendTelemetryReport, 'provisionWarningShown', False):
out.warn("Unable to send telemetry report: not provisioned")
sendTelemetryReport.provisionWarningShown = True
return None
builder = TelemetryReportBuilder()
report = builder.prepare()
sender = ReportSender(model="telemetry", max_retries=0)
return sender.send(report)