mirror of
https://github.com/OpenSolo/OpenSolo.git
synced 2025-04-30 06:34:38 +02:00
140 lines
3.9 KiB
Python
140 lines
3.9 KiB
Python
|
|
import clock
|
|
import re
|
|
import subprocess
|
|
|
|
def run_cmd(ifname, cmd):
|
|
"""run a wpa_cli command, return the output"""
|
|
cmd.insert(0, "wpa_cli")
|
|
cmd.insert(1, "-i" + ifname)
|
|
return subprocess.check_output(cmd)
|
|
|
|
def run_cmd_ok(ifname, cmd):
|
|
"""run a wpa_cli command that should return OK"""
|
|
cmd.insert(0, "wpa_cli")
|
|
cmd.insert(1, "-i" + ifname)
|
|
out = subprocess.check_output(cmd)
|
|
m = re.match("OK", out)
|
|
if not m:
|
|
raise RuntimeError
|
|
|
|
def run_cmd_int(ifname, cmd):
|
|
"""run a wpa_cli command that should return an integer"""
|
|
cmd.insert(0, "wpa_cli")
|
|
cmd.insert(1, "-i" + ifname)
|
|
out = subprocess.check_output(cmd)
|
|
m = re.match("([0-9]+)", out)
|
|
if not m:
|
|
raise RuntimeError
|
|
# m.group(1) is still a string, e.g "2"
|
|
return m.group(1)
|
|
|
|
def get_status(ifname):
|
|
"""get status from wpa_cli"""
|
|
cmd = ["wpa_cli", "-i" + ifname, "status"]
|
|
out = subprocess.check_output(cmd)
|
|
out = out.splitlines()
|
|
status = {}
|
|
for line in out:
|
|
m = re.match("(.+?)=(.+)", line)
|
|
if not m:
|
|
raise RuntimeError
|
|
status[m.group(1)] = m.group(2)
|
|
return status
|
|
|
|
# wpa_status polling, observed states:
|
|
# (times shown are one particular connection and may vary quite a bit)
|
|
#
|
|
# Initially:
|
|
# wpa_state=DISCONNECTED
|
|
# address=00:02:60:02:70:28
|
|
#
|
|
# After ~25 msec:
|
|
# wpa_state=SCANNING
|
|
# address=00:02:60:02:70:28
|
|
#
|
|
# After ~700 msec:
|
|
# wpa_state=ASSOCIATING
|
|
# address=00:02:60:02:70:28
|
|
#
|
|
# After ~750 msec:
|
|
# bssid=02:02:60:02:70:25
|
|
# ssid=SoloLink_090909
|
|
# id=0
|
|
# mode=station
|
|
# pairwise_cipher=NONE
|
|
# group_cipher=NONE
|
|
# key_mgmt=NONE
|
|
# wpa_state=COMPLETED
|
|
# address=00:02:60:02:70:28
|
|
def poll_status(ifname, final_state, timeout=None, verbose=False):
|
|
"""poll status until it reaches a given state or timeout"""
|
|
if timeout is not None:
|
|
end_us = clock.gettime_us(clock.CLOCK_MONOTONIC) + (timeout * 1000000)
|
|
else:
|
|
end_us = None
|
|
last_stat = {}
|
|
while True:
|
|
now_us = clock.gettime_us(clock.CLOCK_MONOTONIC)
|
|
stat = get_status(ifname)
|
|
if verbose and stat != last_stat:
|
|
print stat
|
|
last_stat = stat
|
|
if "wpa_state" in stat:
|
|
new_state = stat["wpa_state"]
|
|
else:
|
|
new_state = ""
|
|
if new_state == final_state:
|
|
return True
|
|
if end_us is not None and now_us > end_us:
|
|
return False
|
|
|
|
def network_add(ifname, ssid):
|
|
net_num = run_cmd_int(ifname, ["add_network"])
|
|
run_cmd_ok(ifname, ["select_network", net_num])
|
|
run_cmd_ok(ifname, ["enable_network", net_num])
|
|
run_cmd_ok(ifname, ["set_network", net_num, "ssid", "\"" + ssid + "\""])
|
|
run_cmd_ok(ifname, ["set_network", net_num, "key_mgmt", "NONE"])
|
|
return net_num
|
|
|
|
def network_connect(ifname, timeout):
|
|
run_cmd_ok(ifname, ["reassociate"])
|
|
return poll_status(ifname, "COMPLETED", timeout)
|
|
|
|
def network_disconnect(ifname):
|
|
run_cmd_ok(ifname, ["disconnect"])
|
|
|
|
def network_remove(ifname, net_num):
|
|
run_cmd_ok(ifname, ["remove_network", net_num])
|
|
|
|
def network_remove_all(ifname):
|
|
for net_num in range(10):
|
|
run_cmd(ifname, ["remove_network", str(net_num)])
|
|
|
|
def save(ifname):
|
|
run_cmd_ok(ifname, ["save_config"])
|
|
|
|
def set(ifname, variable, value):
|
|
run_cmd_ok(ifname, ["set", variable, value])
|
|
|
|
def pin_pair(ifname, pin):
|
|
return run_cmd(ifname, ["wps_pin", "any", str(pin)])
|
|
|
|
def reconfigure(ifname):
|
|
return run_cmd(ifname, ["reconfigure"])
|
|
|
|
import datetime
|
|
|
|
def wps_logged():
|
|
log = open("/log/wps.log", "a")
|
|
pin_pair("wlan0", 74015887)
|
|
last_stat = {}
|
|
while True:
|
|
stat = get_status("wlan0")
|
|
if stat != last_stat:
|
|
log.write(str(datetime.datetime.now()))
|
|
log.write(": ")
|
|
log.write(str(stat))
|
|
log.write("\n")
|
|
last_stat = stat
|