Source code for paradrop.airshark.scanner

from __future__ import print_function
from multiprocessing import Process, Value
from ctypes import c_bool
import os
import time

from .spectrum_reader import SpectrumReader

[docs]class Scanner(object): interface = None freqlist = None process = None debugfs_dir = None spectrum_reader = None def __init__(self, interface): self.interface = interface self.phy = "" self.debugfs_dir = self._find_debugfs_dir() if not self.debugfs_dir: raise Exception('Unable to access spectral_scan_ctl file for interface %s' % interface) self.ctl_file = '%s/spectral_scan_ctl' % self.debugfs_dir self.sample_count_file = '%s/spectral_count' % self.debugfs_dir self.short_repeat_file = '%s/spectral_short_repeat' % self.debugfs_dir self.cur_chan = 6 self.sample_count = 8 self.process = None self.set_freqs(2412, 2472, 5) self.spectrum_reader = SpectrumReader('%s/spectral_scan0' % self.debugfs_dir)
[docs] def dev_to_phy(self, dev): f = open('/sys/class/net/%s/phy80211/name' % dev) phy = f.read().strip() f.close() return phy
def _find_debugfs_dir(self): ''' search debugfs for spectral_scan_ctl for this interface ''' for dirname, subd, files in os.walk('/sys/kernel/debug/ieee80211'): if 'spectral_scan_ctl' in files: phy = dirname.split(os.path.sep)[-2] if phy == self.dev_to_phy(self.interface): self.phy = phy return dirname return None def _scan(self, scanning): while scanning.value: cmd = 'iw dev %s scan trigger' % self.interface if self.freqlist: cmd = '%s freq %s' % (cmd, ' '.join(self.freqlist)) os.system('%s >/dev/null 2>/dev/null' % cmd) time.sleep(0.1)
[docs] def set_freqs(self, minf, maxf, spacing): self.freqlist = ['%s' % x for x in range(minf, maxf + spacing, spacing)]
[docs] def cmd_chanscan(self): f = open(self.ctl_file, 'w') f.write("chanscan") f.close()
[docs] def cmd_disable(self): f = open(self.ctl_file, 'w') f.write("disable") f.close()
[docs] def cmd_set_samplecount(self, count): print("set sample count to %d" % count) f = open(self.sample_count_file, 'w') f.write("%s" % count) f.close()
[docs] def cmd_set_short_repeat(self, short_repeat): f = open(self.short_repeat_file, 'w') f.write("%s" % short_repeat) f.close()
[docs] def start(self): if self.process is None: self.scanning = Value(c_bool, True) self.process = Process(target=self._scan, args=(self.scanning,)) self.process.start()
[docs] def stop(self): self.cmd_disable() if self.process is not None: self.scanning.value = False self.process.join() self.process = None self.spectrum_reader.flush()
[docs] def get_debugfs_dir(self): return self.debugfs_dir