Source code for paradrop.backend.pdconfd.config.firewall

from .base import ConfigObject
from .command import Command


[docs]class ConfigRedirect(ConfigObject): typename = "redirect" options = [ {"name": "src", "type": str, "required": False, "default": None}, {"name": "src_ip", "type": str, "required": False, "default": None}, {"name": "src_dip", "type": str, "required": False, "default": None}, {"name": "src_port", "type": str, "required": False, "default": None}, {"name": "src_dport", "type": str, "required": False, "default": None}, {"name": "proto", "type": str, "required": True, "default": "tcpudp"}, {"name": "dest", "type": str, "required": False, "default": None}, {"name": "dest_ip", "type": str, "required": False, "default": None}, {"name": "dest_port", "type": str, "required": False, "default": None}, {"name": "target", "type": str, "required": False, "default": "DNAT"} ] def __commands_dnat(self, allConfigs, action): """ Generate DNAT iptables rules. """ commands = list() src_zone = self.lookup(allConfigs, "zone", self.src) # Special cases: # None->skip protocol and port arguments, # tcpudp->[tcp, udp] if self.proto is None: protocols = [None] elif self.proto == "tcpudp": protocols = ["tcp", "udp"] else: protocols = [self.proto] for interface in src_zone.interfaces(allConfigs): for proto in protocols: cmd = ["iptables", "--table", "nat", action, "PREROUTING", "--in-interface", interface.ifname] if self.src_ip is not None: cmd.extend(["--source", self.src_ip]) if self.src_dip is not None: cmd.extend(["--destination", self.src_dip]) if proto is not None: cmd.extend(["--proto", proto]) if self.src_port is not None: cmd.extend(["--sport", self.src_port]) if self.src_dport is not None: cmd.extend(["--dport", self.src_dport]) if self.dest_ip is not None: if self.dest_port is not None: cmd.extend(["--jump", "DNAT", "--to-destination", "{}:{}".format( self.dest_ip, self.dest_port)]) else: cmd.extend(["--jump", "DNAT", "--to-destination", self.dest_ip]) elif self.dest_port is not None: cmd.extend(["--jump", "REDIRECT", "--to-port", self.dest_port]) cmd.extend(["--match", "comment", "--comment", "pdconfd {} {}".format(self.typename, self.name)]) commands.append(Command(Command.PRIO_ADD_IPTABLES, cmd, self)) return commands def __commands_snat(self, allConfigs, action): """ Generate SNAT iptables rules. """ commands = list() # TODO: implement SNAT rules return commands
[docs] def commands(self, allConfigs): if self.target == "DNAT": commands = self.__commands_dnat(allConfigs, "--insert") elif self.target == "SNAT": commands = self.__commands_snat(allConfigs, "--insert") else: commands = list() self.manager.forwardingCount += 1 if self.manager.forwardingCount == 1: cmd = ["sysctl", "--write", "net.ipv4.conf.all.forwarding=1"] commands.append(Command(Command.PRIO_ADD_IPTABLES, cmd, self)) return commands
[docs] def undoCommands(self, allConfigs): if self.target == "DNAT": commands = self.__commands_dnat(allConfigs, "--delete") elif self.target == "SNAT": commands = self.__commands_snat(allConfigs, "--delete") else: commands = list() self.manager.forwardingCount -= 1 if self.manager.forwardingCount == 0: cmd = ["sysctl", "--write", "net.ipv4.conf.all.forwarding=0"] commands.append(Command(Command.PRIO_ADD_IPTABLES, cmd, self)) return commands
[docs]class ConfigZone(ConfigObject): typename = "zone" options = [ {"name": "name", "type": str, "required": True, "default": None}, {"name": "network", "type": list, "required": False, "default": None}, {"name": "masq", "type": bool, "required": False, "default": False}, {"name": "input", "type": str, "required": False, "default": "DROP"}, {"name": "forward", "type": str, "required": False, "default": "DROP"}, {"name": "output", "type": str, "required": False, "default": "DROP"} ]
[docs] def interfaces(self, allConfigs): """ List of interfaces in this zone (generator). """ if self.network is not None: for networkName in self.network: # Look up the interface - may fail. interface = self.lookup(allConfigs, "interface", networkName) yield interface
def __commands_iptables(self, allConfigs, action): commands = list() if self.masq: for interface in self.interfaces(allConfigs): cmd = ["iptables", "--table", "nat", action, "POSTROUTING", "--out-interface", interface.ifname, "--jump", "MASQUERADE", "--match", "comment", "--comment", "pdconfd {} {}".format(self.typename, self.name)] commands.append(Command(Command.PRIO_ADD_IPTABLES, cmd, self)) return commands
[docs] def commands(self, allConfigs): commands = self.__commands_iptables(allConfigs, "--insert") if self.masq: self.manager.forwardingCount += 1 if self.manager.forwardingCount == 1: cmd = ["sysctl", "--write", "net.ipv4.conf.all.forwarding=1"] commands.append(Command(Command.PRIO_ADD_IPTABLES, cmd, self)) return commands
[docs] def undoCommands(self, allConfigs): commands = self.__commands_iptables(allConfigs, "--delete") if self.masq: self.manager.forwardingCount -= 1 if self.manager.forwardingCount == 0: cmd = ["sysctl", "--write", "net.ipv4.conf.all.forwarding=0"] commands.append(Command(Command.PRIO_ADD_IPTABLES, cmd, self)) return commands