OpenSolo/sololink/pair/wpa_cli.py

140 lines
3.9 KiB
Python

import clock
import re
import subprocess
def run_cmd(ifname, cmd):
"""run a wpa_cli command, return the output"""
cmd.insert(0, "wpa_cli")
cmd.insert(1, "-i" + ifname)
return subprocess.check_output(cmd)
def run_cmd_ok(ifname, cmd):
"""run a wpa_cli command that should return OK"""
cmd.insert(0, "wpa_cli")
cmd.insert(1, "-i" + ifname)
out = subprocess.check_output(cmd)
m = re.match("OK", out)
if not m:
raise RuntimeError
def run_cmd_int(ifname, cmd):
"""run a wpa_cli command that should return an integer"""
cmd.insert(0, "wpa_cli")
cmd.insert(1, "-i" + ifname)
out = subprocess.check_output(cmd)
m = re.match("([0-9]+)", out)
if not m:
raise RuntimeError
# m.group(1) is still a string, e.g "2"
return m.group(1)
def get_status(ifname):
"""get status from wpa_cli"""
cmd = ["wpa_cli", "-i" + ifname, "status"]
out = subprocess.check_output(cmd)
out = out.splitlines()
status = {}
for line in out:
m = re.match("(.+?)=(.+)", line)
if not m:
raise RuntimeError
status[m.group(1)] = m.group(2)
return status
# wpa_status polling, observed states:
# (times shown are one particular connection and may vary quite a bit)
#
# Initially:
# wpa_state=DISCONNECTED
# address=00:02:60:02:70:28
#
# After ~25 msec:
# wpa_state=SCANNING
# address=00:02:60:02:70:28
#
# After ~700 msec:
# wpa_state=ASSOCIATING
# address=00:02:60:02:70:28
#
# After ~750 msec:
# bssid=02:02:60:02:70:25
# ssid=SoloLink_090909
# id=0
# mode=station
# pairwise_cipher=NONE
# group_cipher=NONE
# key_mgmt=NONE
# wpa_state=COMPLETED
# address=00:02:60:02:70:28
def poll_status(ifname, final_state, timeout=None, verbose=False):
"""poll status until it reaches a given state or timeout"""
if timeout is not None:
end_us = clock.gettime_us(clock.CLOCK_MONOTONIC) + (timeout * 1000000)
else:
end_us = None
last_stat = {}
while True:
now_us = clock.gettime_us(clock.CLOCK_MONOTONIC)
stat = get_status(ifname)
if verbose and stat != last_stat:
print stat
last_stat = stat
if "wpa_state" in stat:
new_state = stat["wpa_state"]
else:
new_state = ""
if new_state == final_state:
return True
if end_us is not None and now_us > end_us:
return False
def network_add(ifname, ssid):
net_num = run_cmd_int(ifname, ["add_network"])
run_cmd_ok(ifname, ["select_network", net_num])
run_cmd_ok(ifname, ["enable_network", net_num])
run_cmd_ok(ifname, ["set_network", net_num, "ssid", "\"" + ssid + "\""])
run_cmd_ok(ifname, ["set_network", net_num, "key_mgmt", "NONE"])
return net_num
def network_connect(ifname, timeout):
run_cmd_ok(ifname, ["reassociate"])
return poll_status(ifname, "COMPLETED", timeout)
def network_disconnect(ifname):
run_cmd_ok(ifname, ["disconnect"])
def network_remove(ifname, net_num):
run_cmd_ok(ifname, ["remove_network", net_num])
def network_remove_all(ifname):
for net_num in range(10):
run_cmd(ifname, ["remove_network", str(net_num)])
def save(ifname):
run_cmd_ok(ifname, ["save_config"])
def set(ifname, variable, value):
run_cmd_ok(ifname, ["set", variable, value])
def pin_pair(ifname, pin):
return run_cmd(ifname, ["wps_pin", "any", str(pin)])
def reconfigure(ifname):
return run_cmd(ifname, ["reconfigure"])
import datetime
def wps_logged():
log = open("/log/wps.log", "a")
pin_pair("wlan0", 74015887)
last_stat = {}
while True:
stat = get_status("wlan0")
if stat != last_stat:
log.write(str(datetime.datetime.now()))
log.write(": ")
log.write(str(stat))
log.write("\n")
last_stat = stat