###################################################################
# Copyright 2013-2014 All Rights Reserved
# Authors: The Paradrop Team
###################################################################
import traceback, os
from paradrop.lib import settings
from paradrop.lib.utils import pdos, pdosq
from pdtools.lib.output import out
[docs]def getSystemConfigDir():
base = settings.UCI_CONFIG_DIR
if "SNAP_APP_DATA_PATH" in os.environ:
base = os.path.join(os.environ['SNAP_APP_DATA_PATH'], "config")
pdosq.makedirs(base)
return base
[docs]def getSystemPath(filename):
"""
Get the path to the system configuration file.
This function also attempts to create the configuration directory if it
does not exist.
Typical filenames: network, wireless, qos, firewall, dhcp, etc.
"""
base = getSystemConfigDir()
return os.path.join(base, filename)
[docs]def stringify(a):
b = {}
#print("%s\n" % a)
if (type(a) == str):
return a
for k, v in a.iteritems():
if type(v) is dict:
b[k] = stringify(v)
elif type(v) is list:
b[k] = [stringify(v1) for v1 in v]
else:
b[k] = str(v)
return b
[docs]def configsMatch(a, b):
"""Takes 2 config objects, returns True if they match, False otherwise."""
for e1 in a:
c1, o1 = e1
c1 = stringify(c1)
o1 = stringify(o1)
for e2 in b:
c2, o2 = e2
c2 = stringify(c2)
o2 = stringify(o2)
if(c1 == c2 and o1 == o2):
# Found a match, move to next @a
break
else:
# Didn't find a match so return false
out.err('Not a match in configsMatch\n')
#out.warn('C1: %r\tO1: %r\n' % (c1, o1))
#out.warn('C2: %r\tO2: %r\n' % (c2, o2))
return False
return True
[docs]def chuteConfigsMatch(chutePre, chutePost):
""" Takes two lists of objects, and returns whether or not they are identical."""
# TODO - currently using a naive implementation by searching over the old configs and the new configs.
# Could improve if slow by keep track of matched configs on both sides, deleting from search space
# If any objects remain at the end
# loop through all old configs, check if they each have a match in the new configs
for c1 in chutePre:
for c2 in chutePost:
if (singleConfigMatches(c1, c2)):
break
else:
# We got through the loop without finding a match, so return false
return False
for c2 in chutePost:
for c1 in chutePre:
if (singleConfigMatches(c1, c2)):
break
else:
return False
return True
[docs]def isMatch(a, b):
#print("a: %s\nb: %s" % (a, b))
a = stringify(a)
b = stringify(b)
return (a == b)
[docs]def singleConfigMatches(a, b):
(c1, o1) = a
(c2, o2) = b
return isMatch(c1, c2) and isMatch(o1, o2)
[docs]class UCIConfig:
"""
Wrapper around the UCI configuration files.
These files are found under /etc/config/*, and are used by OpenWrt to keep track of
configuration for modules typically found in /etc/init.d/*
The modules of interest and with current support are:
- firewall
- network
- wireless
- qos
* This class should work with any UCI module but ALL others are UNTESTED!
New configuration settings can be added to the UCI file via addConfig().
Each UCI config file is expected to contain the following syntax:
config keyA [valueA]
option key1 value1
...
list key2 value1
list key2 value2
...
list key3 value1
list key3 value2
Based on the UCI file above, the config syntax would look like the following:
config is a list of tuples, containing 2 dict objects in each tuple:
- tuple[0] describes the first line (config keyA [valueA])
{'type': keyA, 'name': valueA}
The value parameter is optional and if missing, then the 'name' key is also missing (rather than set to None).
- tuple[1] describes the options associated with the settings (both 'option' and 'list' lines)
{'key1': 'value1', ...}
If a list is present, it looks like the following:
{
...,
'list': {
'key2': [value1, value2, ...],
'key3': [value1, value2, ...]
}
}
So for the example above, the full config definition would look like:
C = {'type': 'keyA', 'name': 'valueA'}
O = {'key1': 'value1', 'list': {'key2': ['value1', 'value2'], 'key3': ['value1', 'value2']}}
config = [(C, O)]
"""
def __init__(self, filepath):
self.filepath = filepath
self.myname = os.path.basename(self.filepath)
if (not os.path.isfile(self.filepath)):
open(self.filepath, 'a').close()
self.config = self.readConfig()
def __eq__(self, o):
if(self.filepath != o.filepath):
return False
if(self.myname != o.myname):
return False
# before parsing through each one, do simple check to make sure the number of configs is the same
if(len(self.config) != len(o.config)):
return False
# Parse through config finding any differences
oc = o.config
for cfg in self.config:
c, o = cfg
# This is stupid slow, but easy, just look through all values for a match
for cfg1 in oc:
c1, o1 = cfg1
if(c1 == c and o1 == o):
break
else:
# Found no match so we return None
return False
return True
def __ne__(self, o):
"""Override the not equals operator between 2 Config objects
This is required because the config attribute contains a list of tuples which Python doesn't
seem to like to do comparisons directly on, for instance cfg1.config != cfg2.config fails to
say they are the same even though they are."""
if(self.filepath != o.filepath):
return True
if(self.myname != o.myname):
return True
# before parsing through each one, do simple check to make sure the number of configs is the same
if(len(self.config) != len(o.config)):
return True
# Parse through config finding any differences
oc = o.config
for cfg in self.config:
c, o = cfg
# This is stupid slow, but easy, just look through all values for a match
for cfg1 in oc:
c1, o1 = cfg1
if(c1 == c and o1 == o):
break
else:
# Found no match so we return None
return True
return False
[docs] def getConfig(self, config):
""" Returns a list of call configs with the given title """
matches = []
# Search through the config array for matches
config = stringify(config)
for e in self.config:
c, o = e
if(c == config):
matches.append((c, o))
return matches
[docs] def existsConfig(self, config, options):
"""Tests if the (config, options) is in the current config file."""
# Search through the config array for matches
config = stringify(config)
options = stringify(options)
for e in self.config:
c, o = e
if(c == config and o == options):
return True
return False
[docs] def addConfigs(self, configs):
""" Adds a list of tuples to our config """
for e in configs:
c, o = e
self.addConfig(c, o)
[docs] def delConfigs(self, configs):
""" Adds a list of tuples to our config """
for e in configs:
c, o = e
self.delConfig(c, o)
[docs] def addConfig(self, config, options):
"""Adds the tuple to our config."""
if (not self.existsConfig(config, options)):
self.config.append((config, options))
[docs] def delConfig(self, config, options):
"""Finds a match to the config input and removes it from the internal config data structure."""
config = stringify(config)
options = stringify(options)
# Search through the config array for matches
for i, e in enumerate(self.config):
c, o = e
if(c == config and o == options):
break
else:
# Getting here means we didn't break so no match
out.verbose('No match to delete, config: %r\n' % (config))
return
del(self.config[i])
[docs] def backup(self, backupToken):
"""
Puts a backup of this config to the location specified in @backupPath.
"""
backupPath = '/tmp/%s-%s' % (self.myname, backupToken)
pdos.copy(self.filepath, backupPath)
[docs] def restore(self, backupToken, saveBackup=True):
"""
Replaces real file (at /etc/config/*) with backup copy from /tmp/*-@backupToken location.
Arguments:
backupToken: The backup token appended at the end of the backup path
saveBackup : A flag to keep a backup copy or delete it (default is keep backup)
"""
# Make sure it exists!
backupPath = '/tmp/%s-%s' % (self.myname, backupToken)
if(pdos.exists(backupPath)):
if(saveBackup):
pdos.copy(backupPath, self.filepath)
else:
pdos.move(backupPath, self.filepath)
else:
# This might be ok if they didn't actually make any changes
out.warn('Cannot restore, %s missing backup (might be OK if no changes made)\n' % (self.myname))
[docs] def getChuteConfigs(self, internalid):
chuteConfigs = []
for e in self.config:
c, o = e
if ('comment' in c):
print c
if (c.get('comment', '') == internalid):
chuteConfigs.append((c,o))
return chuteConfigs
[docs] def checkWanConfig(self, internalid):
configDef = {'type': 'interface', 'name': 'wan', 'comment': internalid}
optionsDef = {'ifname': 'eth0', 'proto': 'dhcp'}
otherIfaceWanPresent = False
eth0Present = False
for c, o in self.config:
if (isMatchIgnoreComments(c, configDef)):
if (isMatch(o, optionsDef)):
eth0Present = True
else:
otherIfaceWanPresent = True
# If an sta chute has defined the wan, get rid of the default
if (otherIfaceWanPresent and eth0Present):
self.delConfig(configDef, optionsDef)
# If no wan defined, we need to add a default eth0 one
if (not otherIfaceWanPresent and not eth0Present):
self.addConfig(configDef, optionsDef)
[docs] def save(self, backupToken=None, internalid=None):
"""
Saves out the file in the proper format.
Arguments:
[backupPath] : Save a backup copy of the UCI file to the path provided.
Should be a token name like 'backup', it gets appended with a hyphen.
"""
# Save original copy
if(backupToken):
self.backup(backupToken)
output = ""
# Now generate what the file would look like
for c, o in self.config:
#print("c: %s\n" % c.keys())
line = "config %s" % c['type']
# Check for optional name
if('name' in c.keys()):
line += " %s" % c['name']
if('comment' in c.keys()):
line += " #%s" % c['comment']
output += "%s\n" % line
# Get options
# check for lists first, if they exist remove them first
if('list' in o.keys()):
theLists = o['list']
else:
theLists = None
# Now process everything else quick
for k,v in o.iteritems():
# Make sure we skip the lists key
if(k != 'list'):
line = "\toption %s '%s'\n" % (k,v)
output += line
# Now process the list
if(theLists):
# theLists is a dict where the key is each list name
# and the value is a list of the options we need to include
for k,v in theLists.iteritems():
# so @v here is a list
for vals in v:
# Now append a list set to the config
line = "\tlist %s '%s'\n" % (k, vals)
output += line
# Now add one extra newline before the next set
output += "\n"
# Now write to disk
try:
out.info('Saving %s to disk\n' % (self.filepath))
fd = pdos.open(self.filepath, 'w')
fd.write(output)
# Guarantee that its written to disk before we close
fd.flush()
os.fsync(fd.fileno())
fd.close()
except Exception as e:
out.err('Unable to save new config %s, %s\n' % (self.filepath, str(e)))
out.err('Config may be corrupted, backup exists at /tmp/%s\n' % (self.myname))
[docs] def readConfig(self):
"""Reads in the config file."""
def correctStr(line):
return " ".join(line.split())
lines = []
try:
fd = pdos.open(self.filepath, 'r')
while(True):
line = fd.readline()
if(not line):
break
lines.append(line)
fd.close()
except Exception as e:
out.err('Error reading file %s: %s\n' % (self.filepath, str(e)))
raise e
cfg = None
opt = None
data = []
# Now we have the data, deal with it
for line in lines:
line = line.rstrip()
# If comment ignore
if(line.startswith('#')):
continue
# First make all lines have correct whitespace
# FIXME: if there is a space WITHIN quotes this kills it!
# this could come up as a key in an encryption key
line = correctStr(line)
l = line.split(" ")
# Attempt to remove single quotes from the value if it exists
#print("l: %s" % l)
try:
if (l[2].startswith("'") and not l[2].endswith("'")):
i = 2
# strip off first quotation
l[2] = l[2][1:]
addStr = ""
# Iterate over the rest of the words until we find a second quotation
while True:
if (l[i].endswith("'")):
if (i == 2):
addStr = l[i][:-1]
else:
addStr = "%s %s" % (addStr, l[i][:-1])
a = [l[0], l[1]]
a.append(addStr)
if (len(l) > i+1):
a.extend(l[i+1:])
break
else:
if (i == 2):
addStr = l[i]
else:
addStr = "%s %s" % (addStr, l[i])
i += 1
l = a
else:
l[2] = l[2].replace("'", "")
l[2] = l[2].replace('"', '')
except:
pass
#
# Config
#
#print("l: %s" % l)
if(l[0] == 'config'):
# Save last config we had
if(cfg and opt):
data.append((cfg, opt))
# start a new config
cfg = {'type': l[1]}
# Third element can be comment or name
if(len(l) == 3):
if (l[2].startswith('#')):
cfg['comment'] = l[2][1:]
else:
cfg['name'] = l[2]
elif (len(l) == 4):
# Four elements, so third is name and 4th is comment
cfg['name'] = l[2]
cfg['comment'] = l[3][1:]
opt = {}
#
# Options
#
elif(l[0] == 'option'):
opt[l[1]] = l[2]
#
# List
#
elif(l[0] == 'list'):
# Make sure there is a list key
if('list' not in opt.keys()):
opt['list'] = {}
if(l[1] in opt['list'].keys()):
opt['list'][l[1]].append(l[2])
else:
opt['list'][l[1]] = [l[2]]
else:
# Also at the end of the loop, save the final config we were making
# Make sure cfg,opt aren't None
if(None not in (cfg, opt)):
data.append((cfg, opt))
return data
import argparse
[docs]def setupArgParse():
p = argparse.ArgumentParser(description='UCI Configuration Manager')
p.add_argument('-f', '--file', help='Config file', type=str, default=None)
return p
if(__name__ == '__main__'):
import pprint
pp = pprint.PrettyPrinter(indent=4)
# Get stuff out of the arguments
p = setupArgParse()
args = p.parse_args()
fileLoc = args.file
uci = UCIConfig(fileLoc)
config = uci.readConfig()
pp.pprint(config)