'''
Wamp utility methods.
'''
from autobahn.twisted import wamp, websocket
from autobahn.twisted.wamp import ApplicationSession
from autobahn.wamp.types import ComponentConfig
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.ssl import ClientContextFactory
from . import nexus
from .output import out
# pylint: disable=inconsistent-mro
[docs]class BaseClientFactory(websocket.WampWebSocketClientFactory, ReconnectingClientFactory):
# factor and jitter are two variables that control the exponential backoff.
# The default values in ReconnectingClientFactory seem reasonable.
initialDelay = 1
maxDelay = 60
[docs] def clientConnectionFailed(self, connector, reason):
out.info("Connection failed with reason: " + str(reason))
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
[docs] def clientConnectionLost(self, connector, reason):
out.info("Connection lost with reason: " + str(reason))
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
[docs]class BaseSessionFactory(wamp.ApplicationSessionFactory):
def __init__(self, config, deferred=None):
super(BaseSessionFactory, self).__init__(config)
self.dee = deferred
def __call__(self, *args, **kwargs):
sess = super(BaseSessionFactory, self).__call__(*args, **kwargs)
sess.dee = self.dee
return sess
[docs]class BaseSession(ApplicationSession):
""" Temporary base class for crossbar implementation """
def __init__(self, config=None):
ApplicationSession.__init__(self, config=config)
self.pdid = config.extra
[docs] @classmethod
def start(klass, address, pdid, realm='paradrop', start_reactor=False,
debug=False, extra=None, reconnect=True):
'''
Creates a new instance of this session and attaches it to the router
at the given address and realm.
reconnect: The session will attempt to reconnect on connection failure
and continue trying indefinitely.
'''
# Enable log messages of autobahn for debugging
#import txaio
#txaio.start_logging()
dee = Deferred()
component_config = ComponentConfig(realm=u''+realm, extra=u''+pdid)
session_factory = BaseSessionFactory(config=component_config, deferred=dee)
session_factory.session = klass
transport_factory = BaseClientFactory(session_factory, url=address)
if not reconnect:
transport_factory.maxRetries = 0
transport_factory.setProtocolOptions(autoPingInterval=8., autoPingTimeout=4.,)
context_factory = ClientContextFactory()
websocket.connectWS(transport_factory, context_factory)
if start_reactor:
reactor.run()
return dee
# This the the recommended way to start the WAMP component,
# but it is friendly to customize the component
#runner = ApplicationRunner(url=u''+address, realm=u''+realm)
#return runner.run(klass, start_reactor=start_reactor, auto_reconnect=reconnect)
[docs] def leave(self):
# Do not retry if explicitly asked to leave.
self._transport.factory.maxRetries = 0
nexus.core.session = None
nexus.core.wamp_connected = False
super(BaseSession, self).leave()
[docs] @inlineCallbacks
def onJoin(self, details):
out.info(str(self.__class__.__name__) + ' crossbar session connected')
# Update global session reference. It's hacky, but we do the import
# here to solve the circular import problem. TODO Refactor.
nexus.core.session = self
nexus.core.wamp_connected = True
# Reset exponential backoff timer after a successful connection.
self._transport.factory.resetDelay()
# Inform whoever created us that the session has finished connecting.
# Useful in situations where you need to fire off a single call and not a
# full wamplet
if self.dee is not None:
yield self.dee.callback(self)
###################################################
# Overridden CX interaction methods
###################################################
'''
Note: this first set of methods have all the REAL implemnetion of caller identification:
the caller's information is always passed along for every call. In the crossbar way of doing
things, however, whats passed along is a session id and not our pdid.
The second set of methods is temporary in that it manually passes the
current sessions' pdid. This is not secure, but will have to do for now in the
absence of crossbar router changes.
'''
[docs] def publish(self, topic, *args, **kwargs):
# kwargs['options'] = PublishOptions(disclose_me=True)
args = (self.pdid,) + args
topic = _prepend(self.pdid, topic)
# out.info('cxbr: (%s) publish (%s)' % (self.pdid, topic,))
return ApplicationSession.publish(self, topic, *args, **kwargs)
[docs] def subscribe(self, handler, topic=None, options=None):
topic = _prepend(self.pdid, topic)
out.info('cxbr: (%s) subscribe (%s)' % (self.pdid, topic,))
return ApplicationSession.subscribe(self, handler, topic=topic, options=options)
[docs] def call(self, procedure, *args, **kwargs):
# kwargs['options'] = CallOptions(disclose_me=True)
args = (self.pdid,) + args
procedure = _prepend(self.pdid, procedure)
# out.info('cxbr: (%s) calling (%s)' % (self.pdid, procedure,))
return ApplicationSession.call(self, procedure, *args, **kwargs)
[docs] def register(self, endpoint, procedure=None, options=None):
# options = RegisterOptions(details_arg='session')
procedure = _prepend(self.pdid, procedure)
out.info('cxbr: (%s) register (%s)' % (self.pdid, procedure,))
return ApplicationSession.register(self, endpoint, procedure=procedure, options=options)
###################################################
# Access to the original methods, without convenience modifiers
###################################################
[docs] def stockPublish(self, topic, *args, **kwargs):
return ApplicationSession.publish(self, u''+topic, *args, **kwargs)
[docs] def stockSubscribe(self, handler, topic=None, options=None):
return ApplicationSession.subscribe(self, handler, topic=u''+topic, options=options)
[docs] def stockCall(self, procedure, *args, **kwargs):
return ApplicationSession.call(self, u''+procedure, *args, **kwargs)
[docs] def stockRegister(self, endpoint, procedure=None, options=None):
return ApplicationSession.register(self, endpoint, procedure=u''+procedure, options=options)
def _prepend(pdid, topic):
'''
In order to make subscription and execution code cleaner, this method automatically
injects this classes pdid to the end of any publish or register call.
The topic is also converted to a unicode string.
No consideration is given to 'valid' topics-- thats on you.
'''
return u'' + topic + '.' + pdid