Source code for paradrop.confd.firewall

import ipaddress

from builtins import str

from .base import ConfigObject, ConfigOption
from .command import Command


IPTABLES_WAIT = "5"


[docs]def start_iptables_command(cmd, *args): return [cmd, "--wait", IPTABLES_WAIT] + list(args)
[docs]class ConfigDefaults(ConfigObject): typename = "defaults" options = [ ConfigOption(name="input", default="ACCEPT"), ConfigOption(name="output", default="ACCEPT"), ConfigOption(name="forward", default="ACCEPT"), ConfigOption(name="disable_ipv6", type=bool) ]
[docs] def getName(self): # There should only be one defaults section. If we return a constant # string for all instances, we can match them across file versions. return "SINGLETON"
[docs] def get_iptables(self): """ Get the list of iptables commands to use (iptables / ip6tables). """ if self.disable_ipv6: return ["iptables"] else: return ["iptables", "ip6tables"]
[docs] def apply(self, allConfigs): commands = list() for iptables in self.get_iptables(): for path in ["input", "output", "forward"]: # Create the delegate_X chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--new", "delegate_"+path] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Jump to delegate_X chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--append", path.upper(), "--jump", "delegate_"+path] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Create the X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--new", path+"_rule"] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Jump to X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--append", path.upper(), "--jump", path+"_rule"] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Add a rule at the end with the default policy. cmd = start_iptables_command(iptables) + ["--table", "filter", "--append", path.upper(), "--jump", getattr(self, path)] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) for path in ["prerouting", "postrouting"]: # Create the delegate_X chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--new", "delegate_"+path] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Jump to delegate_X chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--append", path.upper(), "--jump", "delegate_"+path] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Create the X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--new", path+"_rule"] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Jump to X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--append", path.upper(), "--jump", path+"_rule"] commands.append((self.PRIO_IPTABLES_TOP, Command(cmd, self))) return commands
[docs] def revert(self, allConfigs): commands = list() for iptables in self.get_iptables(): for path in ["prerouting", "postrouting"]: # Jump to X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--delete", path.upper(), "--jump", path+"_rule"] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Flush the X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--flush", path+"_rule"] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Delete the X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--delete-chain", path+"_rule"] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Jump to delegate_X chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--delete", path.upper(), "--jump", "delegate_"+path] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Flush the delegate_X chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--flush", "delegate_"+path] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Delete the delegate_X chain. cmd = start_iptables_command(iptables) + ["--table", "nat", "--delete-chain", "delegate_"+path] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) for path in ["input", "output", "forward"]: # Jump to X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--delete", path.upper(), "--jump", path+"_rule"] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Flush the X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--flush", path+"_rule"] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Delete the X_rule chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--delete-chain", path+"_rule"] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Jump to paradrop_X chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--delete", path.upper(), "--jump", "delegate_"+path] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Flush the paradrop_X chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--flush", "delegate_"+path] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Delete the paradrop_X chain. cmd = start_iptables_command(iptables) + ["--table", "filter", "--delete-chain", "delegate_"+path] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) # Delete the default rule. cmd = start_iptables_command(iptables) + ["--table", "filter", "--delete", path.upper(), "--jump", getattr(self, path)] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) return commands
[docs] def updateApply(self, new, allConfigs): commands = list() for iptables in new.get_iptables(): for path in ["input", "output", "forward"]: if getattr(self, path) == getattr(new, path): # Skip if no change. continue # Add the new default rule. cmd = start_iptables_command(iptables) + ["--table", "filter", "--append", path.upper(), "--jump", getattr(new, path)] commands.append((new.PRIO_IPTABLES_TOP, Command(cmd, new))) return commands
[docs] def updateRevert(self, new, allConfigs): commands = list() for iptables in self.get_iptables(): for path in ["input", "output", "forward"]: if getattr(self, path) == getattr(new, path): # Skip if no change. continue # Delete the old default rule. cmd = start_iptables_command(iptables) + ["--table", "filter", "--delete", path.upper(), "--jump", getattr(self, path)] commands.append((-self.PRIO_IPTABLES_TOP, Command(cmd, self))) return commands
[docs]class ConfigZone(ConfigObject): typename = "zone" options = [ ConfigOption(name="name", required=True), ConfigOption(name="network", type=list), ConfigOption(name="masq", type=bool, default=False), ConfigOption(name="masq_src", type=list, default=["0.0.0.0/0"]), ConfigOption(name="masq_dest", type=list, default=["0.0.0.0/0"]), ConfigOption(name="conntrack", type=bool, default=False), ConfigOption(name="input", default="RETURN"), ConfigOption(name="forward", default="RETURN"), ConfigOption(name="output", default="RETURN"), ConfigOption(name="family", default="any") ]
[docs] def get_iptables(self): """ Get the list of iptables commands to use (iptables / ip6tables). """ if self.family == "ipv4": return ["iptables"] elif self.family == "ipv6": return ["ip6tables"] else: return ["iptables", "ip6tables"]
[docs] def setup(self): self._interfaces = list()
def __commands_iptables(self, allConfigs, action, prio): commands = list() for interface in self._interfaces: for iptables in self.get_iptables(): # Jump to zone input chain. chain = "zone_{}_input".format(self.name) cmd = start_iptables_command(iptables) + [ "--table", "filter", action, "delegate_input", "--in-interface", interface.config_ifname, "--jump", chain] commands.append((prio, Command(cmd, self))) # If conntrack is enabled, allow incoming traffic that is # associated with allowed outgoing traffic. if self.conntrack: comment = "zone {} conntrack".format(self.name) cmd = start_iptables_command(iptables) + [ "--table", "filter", action, "input_rule", "--in-interface", interface.config_ifname, "--match", "state", "--state", "ESTABLISHED,RELATED", "--match", "comment", "--comment", comment, "--jump", "ACCEPT"] commands.append((prio, Command(cmd, self))) # Implement default policy for zone. comment = "zone {} default".format(self.name) cmd = start_iptables_command(iptables) + [ "--table", "filter", action, "input_rule", "--in-interface", interface.config_ifname, "--match", "comment", "--comment", comment, "--jump", self.input] commands.append((prio, Command(cmd, self))) # Jump to zone output chain. chain = "zone_{}_output".format(self.name) cmd = start_iptables_command(iptables) + [ "--table", "filter", action, "delegate_output", "--out-interface", interface.config_ifname, "--jump", chain] commands.append((prio, Command(cmd, self))) # Implement default policy for zone. cmd = start_iptables_command(iptables) + [ "--table", "filter", action, "output_rule", "--out-interface", interface.config_ifname, "--match", "comment", "--comment", comment, "--jump", self.output] commands.append((prio, Command(cmd, self))) # Jump to zone forward chain. chain = "zone_{}_forward".format(self.name) cmd = start_iptables_command(iptables) + [ "--table", "filter", action, "delegate_forward", "--in-interface", interface.config_ifname, "--jump", chain] commands.append((prio, Command(cmd, self))) # Implement default policy for zone. cmd = start_iptables_command(iptables) + [ "--table", "filter", action, "forward_rule", "--in-interface", interface.config_ifname, "--match", "comment", "--comment", comment, "--jump", self.forward] commands.append((prio, Command(cmd, self))) # Jump to zone prerouting chain. chain = "zone_{}_prerouting".format(self.name) cmd = start_iptables_command(iptables) + [ "--table", "nat", action, "delegate_prerouting", "--in-interface", interface.config_ifname, "--jump", chain] commands.append((prio, Command(cmd, self))) # Jump to zone postrouting chain. chain = "zone_{}_postrouting".format(self.name) cmd = start_iptables_command(iptables) + [ "--table", "nat", action, "delegate_postrouting", "--out-interface", interface.config_ifname, "--jump", chain] commands.append((prio, Command(cmd, self))) # Masquerade rules are for IPv4 only, so add them outside the # iptables loop above. if self.masq: for src in self.masq_src: for dest in self.masq_dest: comment = "zone {} masq".format(self.name) cmd = start_iptables_command("iptables") + [ "--table", "nat", action, "POSTROUTING", "--out-interface", interface.config_ifname, "--source", src, "--destination", dest, "--match", "comment", "--comment", comment, "--jump", "MASQUERADE"] commands.append((prio, Command(cmd, self))) return commands
[docs] def apply(self, allConfigs): # Initialize the list of network:interface sections. self._interfaces = list() if self.network is not None: for networkName in self.network: # Look up the interface - may fail. interface = self.lookup(allConfigs, "network", "interface", networkName) self._interfaces.append(interface) commands = list() for iptables in self.get_iptables(): for path in ["input", "output", "forward"]: # Create the zone_NAME_X chain. chain = "zone_{}_{}".format(self.name, path) cmd = start_iptables_command(iptables) + [ "--table", "filter", "--new", chain] commands.append((self.PRIO_IPTABLES_ZONE, Command(cmd, self))) for path in ["prerouting", "postrouting"]: # Create the zone_NAME_X chain. chain = "zone_{}_{}".format(self.name, path) cmd = start_iptables_command(iptables) + [ "--table", "nat", "--new", chain] commands.append((self.PRIO_IPTABLES_ZONE, Command(cmd, self))) commands.extend(self.__commands_iptables(allConfigs, "--append", self.PRIO_IPTABLES_ZONE)) if self.masq: self.manager.forwardingCount += 1 if self.manager.forwardingCount == 1: cmd = ["sysctl", "-w", "net.ipv4.conf.all.forwarding=1"] commands.append((self.PRIO_IPTABLES_ZONE, Command(cmd, self))) cmd = ["sysctl", "-w", "net.ipv6.conf.all.forwarding=1"] commands.append((self.PRIO_IPTABLES_ZONE, Command(cmd, self))) return commands
[docs] def revert(self, allConfigs): commands = self.__commands_iptables(allConfigs, "--delete", -self.PRIO_IPTABLES_ZONE) if self.masq: self.manager.forwardingCount -= 1 if self.manager.forwardingCount == 0: cmd = ["sysctl", "-w", "net.ipv4.conf.all.forwarding=0"] commands.append((-self.PRIO_IPTABLES_ZONE, Command(cmd, self))) cmd = ["sysctl", "-w", "net.ipv6.conf.all.forwarding=0"] commands.append((-self.PRIO_IPTABLES_ZONE, Command(cmd, self))) for iptables in self.get_iptables(): pairs = [ ("filter", "input"), ("filter", "output"), ("filter", "forward"), ("nat", "prerouting"), ("nat", "postrouting") ] for table, path in pairs: chain = "zone_{}_{}".format(self.name, path) # Flush the zone_NAME_X chain, so that we do not get an error # with the delete command. cmd = start_iptables_command(iptables) + [ "--table", table, "--flush", chain] commands.append((-self.PRIO_IPTABLES_ZONE, Command(cmd, self))) # Delete the zone_NAME_X chain. cmd = start_iptables_command(iptables) + [ "--table", table, "--delete-chain", chain] commands.append((-self.PRIO_IPTABLES_ZONE, Command(cmd, self))) # Reset the list of network:interface sections. self._interfaces = list() return commands
[docs]class ConfigForwarding(ConfigObject): typename = "forwarding" options = [ ConfigOption(name="src", required=True), ConfigOption(name="dest", required=True) ] def __commands(self, allConfigs, action, prio): commands = list() chain = "zone_{}_forward".format(self.src) for dest_iface in self.dest_interfaces: comment = "forwarding {} -> {}".format(self.src, self.dest) cmd = start_iptables_command("iptables") + [ "--table", "filter", action, chain, "--out-interface", dest_iface.config_ifname, "--match", "comment", "--comment", comment, "--jump", "ACCEPT"] commands.append((prio, Command(cmd, self))) return commands
[docs] def apply(self, allConfigs): # Look up src_zone in order to indicate dependency. If the source zone # changes, we need to reapply this forwarding section as well. self.src_zone = self.lookup(allConfigs, "firewall", "zone", self.src) self.dest_zone = self.lookup(allConfigs, "firewall", "zone", self.dest) # Initialize the list of network:interface sections. self.dest_interfaces = list() if self.dest_zone.network is not None: for networkName in self.dest_zone.network: # Look up the interface - may fail. interface = self.lookup(allConfigs, "network", "interface", networkName) self.dest_interfaces.append(interface) return self.__commands(allConfigs, "--append", self.PRIO_IPTABLES_RULE)
[docs] def revert(self, allConfigs): return self.__commands(allConfigs, "--delete", -self.PRIO_IPTABLES_RULE)
[docs]class ConfigRedirect(ConfigObject): typename = "redirect" options = [ ConfigOption(name="src"), ConfigOption(name="src_ip"), ConfigOption(name="src_dip"), ConfigOption(name="src_port"), ConfigOption(name="src_dport"), ConfigOption(name="proto", required=True), ConfigOption(name="dest"), ConfigOption(name="dest_ip"), ConfigOption(name="dest_port"), ConfigOption(name="target", default="DNAT") ] # Any of these values will be interpreted to mean that --proto and ports # should not be specified in the iptables rule. ANY_PROTO = set([None, "any", "none"]) def __commands_dnat(self, allConfigs, action, prio): """ Generate DNAT iptables rules. """ commands = list() # Special cases: # None->skip protocol and port arguments, # tcpudp->[tcp, udp] if self.proto in self.ANY_PROTO: protocols = [None] elif self.proto == "tcpudp": protocols = ["tcp", "udp"] else: protocols = [self.proto] for proto in protocols: chain = "zone_{}_prerouting".format(self.src) cmd = start_iptables_command("iptables") + ["--table", "nat", action, chain] if self.src_ip is not None: cmd.extend(["--source", self.src_ip]) if self.src_dip is not None: cmd.extend(["--destination", self.src_dip]) if proto is not None: cmd.extend(["--proto", proto]) if self.src_port is not None: cmd.extend(["--sport", self.src_port]) if self.src_dport is not None: cmd.extend(["--dport", self.src_dport]) if self.dest_ip is not None: if self.dest_port is not None: cmd.extend(["--jump", "DNAT", "--to-destination", "{}:{}".format( self.dest_ip, self.dest_port)]) else: cmd.extend(["--jump", "DNAT", "--to-destination", self.dest_ip]) elif self.dest_port is not None: cmd.extend(["--jump", "REDIRECT", "--to-port", self.dest_port]) commands.append((prio, Command(cmd, self))) return commands def __commands_snat(self, allConfigs, action, prio): """ Generate SNAT iptables rules. """ commands = list() # TODO: implement SNAT rules return commands
[docs] def apply(self, allConfigs): if self.target == "DNAT": commands = self.__commands_dnat(allConfigs, "--insert", self.PRIO_CONFIG_IFACE) elif self.target == "SNAT": commands = self.__commands_snat(allConfigs, "--insert", self.PRIO_CONFIG_IFACE) else: raise Exception("Unsupported target ({}) in config {} {}".format( self.target, self.typename, self.name)) self.manager.forwardingCount += 1 if self.manager.forwardingCount == 1: cmd = ["sysctl", "-w", "net.ipv4.conf.all.forwarding=1"] commands.append((self.PRIO_CONFIG_IFACE, Command(cmd, self))) return commands
[docs] def revert(self, allConfigs): if self.target == "DNAT": commands = self.__commands_dnat(allConfigs, "--delete", -self.PRIO_CONFIG_IFACE) elif self.target == "SNAT": commands = self.__commands_snat(allConfigs, "--delete", -self.PRIO_CONFIG_IFACE) else: commands = list() self.manager.forwardingCount -= 1 if self.manager.forwardingCount == 0: cmd = ["sysctl", "-w", "net.ipv4.conf.all.forwarding=0"] commands.append((-self.PRIO_CONFIG_IFACE, Command(cmd, self))) return commands
[docs]class ConfigRule(ConfigObject): typename = "rule" options = [ ConfigOption(name="name", required=False), ConfigOption(name="src"), ConfigOption(name="src_ip"), ConfigOption(name="src_port"), ConfigOption(name="src_mac"), ConfigOption(name="proto"), ConfigOption(name="dest"), ConfigOption(name="dest_ip"), ConfigOption(name="dest_port"), ConfigOption(name="target", required=True), ConfigOption(name="family", default="any"), ConfigOption(name="extra") ]
[docs] def get_iptables(self): """ Get the list of iptables commands to use (iptables / ip6tables). """ # Check if the rule explicitly specifies the family. if self.family == "ipv4": return ["iptables"] elif self.family == "ipv6": return ["ip6tables"] # If the rule specifies the source address, use that as a hint. if self.src_ip is not None: addr = ipaddress.ip_network(str(self.src_ip), strict=False) if addr.version == 4: return ["iptables"] elif addr.version == 6: return ["ip6tables"] # If the rule specifies the destination address, use that as a hint. if self.dest_ip is not None: addr = ipaddress.ip_network(str(self.dest_ip), strict=False) if addr.version == 4: return ["iptables"] elif addr.version == 6: return ["ip6tables"] # Default is to generate rules for both IPv4 and IPv6. return ["iptables", "ip6tables"]
def __commands(self, allConfigs, action, prio): commands = list() # Choose INPUT, FORWARD, or OUTPUT based on whether src and dest are # specified. if self.src and self.dest: chain = "zone_{}_forward".format(self.dest) elif self.src: chain = "zone_{}_input".format(self.src) elif self.dest: chain = "zone_{}_output".format(self.dest) else: # TODO: Is this right? chain = "input_rule" for iptables in self.get_iptables(): # Put together the common options for rules. cmd = start_iptables_command(iptables) + [ "--wait", IPTABLES_WAIT, "--table", "filter", action, chain] # Source and destination IP address. if self.src_ip: cmd.extend(["--source", self.src_ip]) if self.dest_ip: cmd.extend(["--destination", self.dest_ip]) # Source and destination ports. if self.proto: cmd.extend(["--protocol", self.proto]) if self.src_port: cmd.extend(["--sport", str(self.src_port)]) if self.dest_port: cmd.extend(["--dport", str(self.dest_port)]) # Source MAC address. if self.src_mac: cmd.extend(["--match", "mac", "--mac-source", self.src_mac]) # Extra options such as "--match state --state ESTABLISHED,RELATED" if self.extra: cmd.append(self.extra) cmd.extend(["--jump", self.target]) if self.name: cmd.extend(["--match", "comment", "--comment", self.name]) commands.append((prio, Command(cmd, self))) return commands
[docs] def apply(self, allConfigs): return self.__commands(allConfigs, "--append", self.PRIO_IPTABLES_RULE)
[docs] def revert(self, allConfigs): return self.__commands(allConfigs, "--delete", -self.PRIO_IPTABLES_RULE)