from __future__ import print_function
import json
import pycurl
import re
import six
import urllib
import twisted
from twisted.internet import reactor, threads
from twisted.internet.defer import Deferred, DeferredLock, DeferredSemaphore
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
from twisted.web.http_headers import Headers
from paradrop.base import nexus, settings
[docs]class JSONReceiver(Protocol):
"""
JSON Receiver
A JSONReceiver object can be used with the twisted HTTP client
to receive data from a request and provide it to a callback
function when complete.
Example (response came from an HTTP request):
finished = Deferred()
response.deliverBody(JSONReceiver(finished))
finished.addCallback(func_that_takes_result)
Some error conditions will result in the callback firing with a result of
None. The receiver needs to check for this. This seems to occur on 403
errors where the server does not return any data, but twisted just passes
us a ResponseDone object the same type as a normal result.
"""
def __init__(self, response, finished):
"""
response: a twisted Response object
finished: a Deferred object
"""
self.response = response
self.finished = finished
self.data = ""
[docs] def dataReceived(self, data):
"""
internal: handles incoming data.
"""
self.data += data
[docs] def connectionLost(self, reason):
"""
internal: handles connection close events.
"""
if reason.check(twisted.web.client.ResponseDone):
try:
result = json.loads(self.data)
except ValueError:
result = None
self.finished.callback(PDServerResponse(self.response, data=result))
else:
raise Exception(reason.getErrorMessage())
[docs]def urlEncodeParams(data):
"""
Return data URL-encoded.
This function specifically handles None and boolean values
to convert them to JSON-friendly strings (e.g. None -> 'null').
"""
copy = dict()
for key, value in six.iteritems(data):
if value is None:
copy[key] = 'null'
elif isinstance(value, bool):
copy[key] = json.dumps(value)
else:
copy[key] = value
return urllib.urlencode(copy, doseq=True)
[docs]class PDServerResponse(object):
"""
A PDServerResponse object contains the results of a request to pdserver.
This wraps twisted.web.client.Response (cannot be subclassed) and exposes
the same variables in addition to a 'data' variables. The 'data' variable,
if not None, is the parsed object from the response body.
"""
def __init__(self, response, data=None):
self.version = response.version
self.code = response.code
self.phrase = response.phrase
self.headers = response.headers
self.length = response.length
self.success = (response.code >= 200 and response.code < 300)
self.data = data
[docs]class HTTPResponse(object):
def __init__(self, data=None):
self.version = None
self.code = None
self.phrase = None
self.headers = dict()
self.length = None
self.success = False
self.data = data
[docs]class HTTPRequestDriver(object):
def __init__(self):
self.headers = {
"Accept": "application/json",
"Connection": "keep-alive",
"Content-Type": "application/json",
"User-Agent": "ParaDrop/2.5"
}
[docs] def request(self, method, url, body):
raise Exception("Not implemented")
[docs]class CurlRequestDriver(HTTPRequestDriver):
# Shared curl handle.
# May have problems due to issue #411.
# https://github.com/pycurl/pycurl/issues/411
curl = pycurl.Curl()
# Lock for the access to curl.
lock = DeferredLock()
code_pattern = re.compile("(HTTP\S*)\s+(\d+)\s+(.*)")
header_pattern = re.compile("(\S+): (.*)")
def __init__(self):
super(CurlRequestDriver, self).__init__()
# Buffer for receiving response.
self.buffer = six.StringIO()
# Fill in response object.
self.response = HTTPResponse()
[docs] def receive(self, ignore):
"""
Receive response from curl and convert it to a response object.
"""
data = self.buffer.getvalue()
response = self.response
# Try to parse the content if it's JSON.
contentType = response.headers.get('content-type', 'text/html')
if 'json' in contentType:
try:
response.data = json.loads(data)
except Exception:
print("Failed to parse JSON")
print(data)
response.data = data
else:
response.data = data
response.success = (response.code >= 200 and response.code < 300)
return response
[docs] def request(self, method, url, body=None):
def makeRequest(ignored):
curl = CurlRequestDriver.curl
curl.reset()
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.HEADERFUNCTION, self.receiveHeaders)
curl.setopt(pycurl.WRITEFUNCTION, self.buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST, method)
if body is not None:
curl.setopt(pycurl.POSTFIELDS, body)
headers = []
for key, value in six.iteritems(self.headers):
headers.append("{}: {}".format(key, value))
curl.setopt(pycurl.HTTPHEADER, headers)
d = threads.deferToThread(curl.perform)
d.addCallback(self.receive)
return d
def releaseLock(result):
CurlRequestDriver.lock.release()
# Forward the result to the next handler.
return result
d = CurlRequestDriver.lock.acquire()
# Make the request once we acquire the semaphore.
d.addCallback(makeRequest)
# Release the semaphore regardless of how the request goes.
d.addBoth(releaseLock)
return d
[docs]class TwistedRequestDriver(HTTPRequestDriver):
# Using a connection pool enables persistent connections, so we can avoid
# the connection setup overhead when sending multiple messages to the
# server.
pool = HTTPConnectionPool(reactor, persistent=True)
# Used to control the number of concurrent requests because
# HTTPConnectionPool does not do that on its own.
# Discussed here:
# http://stackoverflow.com/questions/25552432/how-to-make-pooling-http-connection-with-twisted
sem = DeferredSemaphore(settings.PDSERVER_MAX_CONCURRENT_REQUESTS)
[docs] def receive(self, response):
"""
Receive response from twisted web client and convert it to a
PDServerResponse object.
"""
deferred = Deferred()
response.deliverBody(JSONReceiver(response, deferred))
return deferred
[docs] def request(self, method, url, body=None):
def makeRequest(ignored):
bodyProducer = None
if body is not None:
bodyProducer = FileBodyProducer(six.StringIO(body))
headers = {}
for key, value in six.iteritems(self.headers):
headers[key] = [value]
agent = Agent(reactor, pool=TwistedRequestDriver.pool)
d = agent.request(method, url, Headers(headers), bodyProducer)
d.addCallback(self.receive)
return d
def releaseSemaphore(result):
TwistedRequestDriver.sem.release()
# Forward the result to the next handler.
return result
d = TwistedRequestDriver.sem.acquire()
# Make the request once we acquire the semaphore.
d.addCallback(makeRequest)
# Release the semaphore regardless of how the request goes.
d.addBoth(releaseSemaphore)
return d
[docs]class PDServerRequest(object):
"""
Make an HTTP request to pdserver.
The API is assumed to use application/json for sending and receiving data.
Authentication is automatically handled here if the router is provisioned.
We handle missing, invalid, or expired tokens by making the request and
detecting a 401 (Unauthorized) response. We request a new token and retry
the failed request. We do this at most once and return failure if the
second attempt returns anything other than 200 (OK).
PDServerRequest objects are not reusable; create a new one for each
request.
URL String Substitutions:
router_id -> router id
Example:
/routers/{router_id}/states -> /routers/halo06/states
"""
# Auth token (JWT): we will automatically request as needed (for the first
# request and after expiration) and store the token in memory for future
# requests.
token = None
def __init__(self, path, driver=TwistedRequestDriver, headers={}, setAuthHeader=True):
self.path = path
self.driver = driver
self.headers = headers
self.setAuthHeader = setAuthHeader
self.transportRetries = 0
url = nexus.core.info.pdserver
if not path.startswith('/'):
url += '/'
url += path
# Perform string substitutions.
self.url = url.format(router_id=nexus.core.info.pdid)
self.body = None
[docs] def get(self, **query):
self.method = 'GET'
if len(query) > 0:
self.url += '?' + urlEncodeParams(query)
d = self.request()
d.addCallback(self.receiveResponse)
return d
[docs] def patch(self, *ops):
"""
Expects a list of operations in jsonpatch format (http://jsonpatch.com/).
An example operation would be:
{'op': 'replace', 'path': '/completed', 'value': True}
"""
self.method = 'PATCH'
self.body = json.dumps(ops)
d = self.request()
d.addCallback(self.receiveResponse)
return d
[docs] def post(self, **data):
self.method = 'POST'
self.body = json.dumps(data)
d = self.request()
d.addCallback(self.receiveResponse)
return d
[docs] def put(self, **data):
self.method = 'PUT'
self.body = json.dumps(data)
d = self.request()
d.addCallback(self.receiveResponse)
return d
[docs] def request(self):
driver = self.driver()
if self.setAuthHeader and PDServerRequest.token is not None:
auth = 'Bearer {}'.format(PDServerRequest.token)
driver.setHeader('Authorization', auth)
for key, value in six.iteritems(self.headers):
driver.setHeader(key, value)
return driver.request(self.method, self.url, self.body)
[docs] def receiveResponse(self, response):
"""
Intercept the response object, and if it's a 401 authenticate and retry.
"""
if response.code == 401 and self.setAuthHeader:
# 401 (Unauthorized) may mean our token is no longer valid.
# Request a new token and then retry the request.
#
# Watch out for infinite recursion here! If this inner request
# returns a 401 code, meaning the id/password is invalid, it should
# not go down this code path again (prevented by check against
# self.setAuthHeader above).
authRequest = PDServerRequest('/auth/router', driver=self.driver,
setAuthHeader=False)
d = authRequest.post(id=nexus.core.info.pdid,
password=nexus.core.getKey('apitoken'))
def cbLogin(authResponse):
if authResponse.success:
PDServerRequest.token = authResponse.data.get('token', None)
# Retry the original request now that we have a new token.
return self.request()
else:
# Our attempt to get a token failed, so give up.
return PDServerResponse(response)
d.addCallback(cbLogin)
return d
else:
return response
[docs] @classmethod
def getServerInfo(c):
"""
Return the information needed to send API messages to the server.
This can be used by an external program (e.g. pdinstall).
"""
info = {
'authorization': 'Bearer {}'.format(c.token),
'router_id': nexus.core.info.pdid,
'server': nexus.core.info.pdserver
}
return info
[docs] @classmethod
def resetToken(c):
"""
Reset the auth token, to be called if the router's identity has changed.
"""
c.token = None
# Initialize pycurl. Does this do anything?
pycurl.global_init(pycurl.GLOBAL_ALL)
# Set the number of connections that can be kept alive in the connection pool.
# Setting this equal to the size of the semaphore should prevent churn.
TwistedRequestDriver.pool.maxPersistentPerHost = settings.PDSERVER_MAX_CONCURRENT_REQUESTS