Source code for paradrop.core.config.firewall

import fnmatch

from paradrop.base.output import out
from paradrop.lib.utils import uci

from . import uciutils


[docs]def findMatchingInterface(iface_name, interfaces): """ Search an interface list for one matching a given name. iface_name can contain shell-style wildcards (* and ?). """ for iface in interfaces: if fnmatch.fnmatch(iface['name'], iface_name): return iface return None
[docs]def getOSFirewallRules(update): """ There is a set of default things that must exist just for the chute to function at all, generate those here. Stored in key: osFirewallRules """ interfaces = update.cache_get('networkInterfaces') if interfaces is None: return None rules = [] # Add a zone for each interface. for iface in interfaces: config = {'type': 'zone'} options = { 'name': iface['externalIntf'], 'conntrack': True, 'input': 'REJECT', 'forward': 'ACCEPT', 'output': 'ACCEPT', 'network': [iface['externalIntf']] } if iface['type'] == 'wan': options['masq'] = 1 else: options['masq'] = 0 # Add the zone section first. rules.append((config, options)) # Then add a forwarding rule for wan type interfaces. if iface['type'] == 'wan': config = {'type': 'forwarding'} options = { 'src': iface['externalIntf'], 'dest': 'wan' } rules.append((config, options)) # If we are running a DHCP server for the chute, we should allow DHCP # and DNS requests to the host. if 'dhcp' in iface: # Allow DNS requests. config = {'type': 'rule'} options = { 'src': iface['externalIntf'], 'proto': 'udp', 'dest_port': 53, 'target': 'ACCEPT' } rules.append((config, options)) # Allow DHCP requests. config = {'type': 'rule'} options = { 'src': iface['externalIntf'], 'proto': 'udp', 'dest_port': 67, 'target': 'ACCEPT' } rules.append((config, options)) update.cache_set('osFirewallRules', rules)
[docs]def getDeveloperFirewallRules(update): """ Generate other firewall rules requested by the developer such as redirects. The object returned is a list of tuples (config, options). """ interfaces = update.cache_get('networkInterfaces') if interfaces is None: return None rules = list() if hasattr(update.new, "firewall"): for rule in update.new.firewall: if rule['type'] == 'redirect': config = {'type': 'redirect'} options = { 'name': rule['name'], 'proto': 'tcpudp' } from_parts = rule['from'].strip().split(':') to_parts = rule['to'].strip().split(':') # Do not allow rules that do not pertain to the chute. if "@host.lan" in from_parts[0] and "@host.lan" in to_parts[0]: raise Exception("Unable to add firewall rule - " "dst and src are both outside of chute") # From @host.lan means this is a DNAT rule (redirect to the chute). if from_parts[0] == "@host.lan": options['target'] = "DNAT" options['src'] = "wan" if len(from_parts) > 1: options['src_dport'] = from_parts[1] # Find the interface specified in the rule, so we can get its # IP address. iface = findMatchingInterface(to_parts[0], interfaces) if iface is None: out.warn("No interface found with name {}\n".format(to_parts[0])) raise Exception("Interface not found") options['dest_ip'] = iface['externalIpaddr'] if len(to_parts) > 1: options['dest_port'] = to_parts[1] # This is an SNAT rule (redirect from the chute to host network). elif to_parts[0] == "@host.lan": options['target'] = "SNAT" # TODO: Implement out.warn("SNAT rules not supported yet") raise Exception("SNAT rules not implemented") # Could be forwarding between chute interfaces? else: out.warn("Other rules not supported yet") raise Exception("Other rules not implemented") rules.append((config, options)) update.cache_set('developerFirewallRules', rules)
[docs]def setOSFirewallRules(update): """ Takes a list of tuples (config, opts) and saves it to the firewall config file. """ uciutils.setConfig(update, cacheKeys=['osFirewallRules', 'developerFirewallRules'], filepath=uci.getSystemPath("firewall"))
[docs]def revert_os_firewall_rules(update): uciutils.setConfig(update, cacheKeys=['osFirewallRules', 'developerFirewallRules'], filepath=uci.getSystemPath("firewall"))